OmniSciDB  dfae7c3b14
RelAlgExecutor Class Reference

#include <RelAlgExecutor.h>

+ Inheritance diagram for RelAlgExecutor:
+ Collaboration diagram for RelAlgExecutor:

Classes

struct  TableFunctionWorkUnit
 
struct  WorkUnit
 

Public Types

using TargetInfoList = std::vector< TargetInfo >
 

Public Member Functions

 RelAlgExecutor (Executor *executor, const Catalog_Namespace::Catalog &cat, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
 RelAlgExecutor (Executor *executor, const Catalog_Namespace::Catalog &cat, const std::string &query_ra, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
 RelAlgExecutor (Executor *executor, const Catalog_Namespace::Catalog &cat, std::unique_ptr< RelAlgDagBuilder > query_dag, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
size_t getOuterFragmentCount (const CompilationOptions &co, const ExecutionOptions &eo)
 
ExecutionResult executeRelAlgQuery (const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
 
ExecutionResult executeRelAlgQueryWithFilterPushDown (const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
void prepareLeafExecution (const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
 
ExecutionResult executeRelAlgSeq (const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
 
ExecutionResult executeRelAlgSubSeq (const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
FirstStepExecutionResult executeRelAlgQuerySingleStep (const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
 
void addLeafResult (const unsigned id, const AggregatedResult &result)
 
const RelAlgNodegetRootRelAlgNode () const
 
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries () const noexcept
 
const QueryHint getParsedQueryHints () const
 
ExecutionResult executeSimpleInsert (const Analyzer::Query &insert_query)
 
AggregatedColRange computeColRangesCache ()
 
StringDictionaryGenerations computeStringDictionaryGenerations ()
 
TableGenerations computeTableGenerations ()
 
ExecutorgetExecutor () const
 
void cleanupPostExecution ()
 

Static Public Member Functions

static std::string getErrorMessageFromCode (const int32_t error_code)
 

Private Member Functions

ExecutionResult executeRelAlgQueryNoRetry (const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
 
void executeRelAlgStep (const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
void executeUpdate (const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
 
void executeDelete (const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
 
ExecutionResult executeCompound (const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeAggregate (const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
ExecutionResult executeProject (const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
 
ExecutionResult executeTableFunction (const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
 
void computeWindow (const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
 
std::unique_ptr< WindowFunctionContextcreateWindowFunctionContext (const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
 
ExecutionResult executeFilter (const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeSort (const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeLogicalValues (const RelLogicalValues *, const ExecutionOptions &)
 
ExecutionResult executeModify (const RelModify *modify, const ExecutionOptions &eo)
 
ExecutionResult executeUnion (const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
WorkUnit createSortInputWorkUnit (const RelSort *, const ExecutionOptions &eo)
 
ExecutionResult executeWorkUnit (const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
 
size_t getNDVEstimation (const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
 
std::optional< size_t > getFilteredCountAll (const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
 
FilterSelectivity getFilterSelectivity (const std::vector< std::shared_ptr< Analyzer::Expr >> &filter_expressions, const CompilationOptions &co, const ExecutionOptions &eo)
 
std::vector< PushedDownFilterInfoselectFiltersToBePushedDown (const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
 
bool isRowidLookup (const WorkUnit &work_unit)
 
ExecutionResult handleOutOfMemoryRetry (const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
 
WorkUnit createWorkUnit (const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createCompoundWorkUnit (const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createAggregateWorkUnit (const RelAggregate *, const SortInfo &, const bool just_explain)
 
WorkUnit createProjectWorkUnit (const RelProject *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createFilterWorkUnit (const RelFilter *, const SortInfo &, const bool just_explain)
 
WorkUnit createJoinWorkUnit (const RelJoin *, const SortInfo &, const bool just_explain)
 
WorkUnit createUnionWorkUnit (const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
 
TableFunctionWorkUnit createTableFunctionWorkUnit (const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
 
void addTemporaryTable (const int table_id, const ResultSetPtr &result)
 
void eraseFromTemporaryTables (const int table_id)
 
void handleNop (RaExecutionDesc &ed)
 
JoinQualsPerNestingLevel translateLeftDeepJoinFilter (const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
 
std::list< std::shared_ptr< Analyzer::Expr > > makeJoinQuals (const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
 
- Private Member Functions inherited from StorageIOFacility< RelAlgExecutorTraits >
 StorageIOFacility (ExecutorType *executor, CatalogType const &catalog)
 
UpdateCallback yieldUpdateCallback (UpdateTransactionParameters &update_parameters)
 
UpdateCallback yieldDeleteCallback (DeleteTransactionParameters &delete_parameters)
 

Static Private Member Functions

static void handlePersistentError (const int32_t error_code)
 

Private Attributes

Executorexecutor_
 
const Catalog_Namespace::Catalogcat_
 
std::unique_ptr< RelAlgDagBuilderquery_dag_
 
std::shared_ptr< const query_state::QueryStatequery_state_
 
TemporaryTables temporary_tables_
 
time_t now_
 
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
 
std::unordered_map< unsigned, AggregatedResultleaf_results_
 
int64_t queue_time_ms_
 

Static Private Attributes

static SpeculativeTopNBlacklist speculative_topn_blacklist_
 
static const size_t max_groups_buffer_entry_default_guess {16384}
 

Friends

class PendingExecutionClosure
 

Additional Inherited Members

- Private Types inherited from StorageIOFacility< RelAlgExecutorTraits >
using ExecutorType = typename RelAlgExecutorTraits ::ExecutorType
 
using CatalogType = typename RelAlgExecutorTraits ::CatalogType
 
using FragmentUpdaterType = UpdateLogForFragment
 
using UpdateCallback = typename FragmentUpdaterType::Callback
 
using TableDescriptorType = typename RelAlgExecutorTraits ::TableDescriptorType
 
using DeleteVictimOffsetList = std::vector< uint64_t >
 
using UpdateTargetOffsetList = std::vector< uint64_t >
 
using UpdateTargetTypeList = std::vector< TargetMetaInfo >
 
using UpdateTargetColumnNamesList = std::vector< std::string >
 
using FragmenterType = Fragmenter_Namespace::InsertOrderFragmenter
 
using TransactionLog = typename FragmenterType::ModifyTransactionTracker
 
using TransactionLogPtr = std::unique_ptr< TransactionLog >
 
using ColumnValidationFunction = std::function< bool(std::string const &)>
 

Detailed Description

Definition at line 54 of file RelAlgExecutor.h.

Member Typedef Documentation

◆ TargetInfoList

Definition at line 56 of file RelAlgExecutor.h.

Constructor & Destructor Documentation

◆ RelAlgExecutor() [1/3]

RelAlgExecutor::RelAlgExecutor ( Executor executor,
const Catalog_Namespace::Catalog cat,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 58 of file RelAlgExecutor.h.

61  : StorageIOFacility(executor, cat)
62  , executor_(executor)
63  , cat_(cat)
64  , query_state_(std::move(query_state))
65  , now_(0)
66  , queue_time_ms_(0) {}
StorageIOFacility(ExecutorType *executor, CatalogType const &catalog)
int64_t queue_time_ms_
const Catalog_Namespace::Catalog & cat_
std::shared_ptr< const query_state::QueryState > query_state_
Executor * executor_

◆ RelAlgExecutor() [2/3]

RelAlgExecutor::RelAlgExecutor ( Executor executor,
const Catalog_Namespace::Catalog cat,
const std::string &  query_ra,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 68 of file RelAlgExecutor.h.

72  : StorageIOFacility(executor, cat)
73  , executor_(executor)
74  , cat_(cat)
75  , query_dag_(std::make_unique<RelAlgDagBuilder>(query_ra, cat_, nullptr))
76  , query_state_(std::move(query_state))
77  , now_(0)
78  , queue_time_ms_(0) {}
StorageIOFacility(ExecutorType *executor, CatalogType const &catalog)
int64_t queue_time_ms_
const Catalog_Namespace::Catalog & cat_
std::shared_ptr< const query_state::QueryState > query_state_
std::unique_ptr< RelAlgDagBuilder > query_dag_
Executor * executor_

◆ RelAlgExecutor() [3/3]

RelAlgExecutor::RelAlgExecutor ( Executor executor,
const Catalog_Namespace::Catalog cat,
std::unique_ptr< RelAlgDagBuilder query_dag,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 80 of file RelAlgExecutor.h.

84  : StorageIOFacility(executor, cat)
85  , executor_(executor)
86  , cat_(cat)
87  , query_dag_(std::move(query_dag))
88  , query_state_(std::move(query_state))
89  , now_(0)
90  , queue_time_ms_(0) {}
StorageIOFacility(ExecutorType *executor, CatalogType const &catalog)
int64_t queue_time_ms_
const Catalog_Namespace::Catalog & cat_
std::shared_ptr< const query_state::QueryState > query_state_
std::unique_ptr< RelAlgDagBuilder > query_dag_
Executor * executor_

Member Function Documentation

◆ addLeafResult()

void RelAlgExecutor::addLeafResult ( const unsigned  id,
const AggregatedResult result 
)
inline

Definition at line 130 of file RelAlgExecutor.h.

References CHECK.

130  {
131  const auto it_ok = leaf_results_.emplace(id, result);
132  CHECK(it_ok.second);
133  }
std::unordered_map< unsigned, AggregatedResult > leaf_results_
#define CHECK(condition)
Definition: Logger.h:197

◆ addTemporaryTable()

void RelAlgExecutor::addTemporaryTable ( const int  table_id,
const ResultSetPtr result 
)
inlineprivate

Definition at line 339 of file RelAlgExecutor.h.

References CHECK, and CHECK_LT.

339  {
340  CHECK_LT(size_t(0), result->colCount());
341  CHECK_LT(table_id, 0);
342  const auto it_ok = temporary_tables_.emplace(table_id, result);
343  CHECK(it_ok.second);
344  }
TemporaryTables temporary_tables_
#define CHECK_LT(x, y)
Definition: Logger.h:207
#define CHECK(condition)
Definition: Logger.h:197

◆ cleanupPostExecution()

void RelAlgExecutor::cleanupPostExecution ( )

Definition at line 396 of file RelAlgExecutor.cpp.

References CHECK, and ResultSet::executor_.

396  {
397  CHECK(executor_);
398  executor_->row_set_mem_owner_ = nullptr;
399  executor_->lit_str_dict_proxy_ = nullptr;
400 }
#define CHECK(condition)
Definition: Logger.h:197
Executor * executor_

◆ computeColRangesCache()

AggregatedColRange RelAlgExecutor::computeColRangesCache ( )

Definition at line 376 of file RelAlgExecutor.cpp.

References ResultSet::executor_, and anonymous_namespace{RelAlgExecutor.cpp}::get_physical_inputs().

376  {
377  AggregatedColRange agg_col_range_cache;
378  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
379  return executor_->computeColRangesCache(phys_inputs);
380 }
const Catalog_Namespace::Catalog & cat_
const RelAlgNode & getRootRelAlgNode() const
std::unordered_set< PhysicalInput > get_physical_inputs(const Catalog_Namespace::Catalog &cat, const RelAlgNode *ra)
Executor * executor_
+ Here is the call graph for this function:

◆ computeStringDictionaryGenerations()

StringDictionaryGenerations RelAlgExecutor::computeStringDictionaryGenerations ( )

Definition at line 382 of file RelAlgExecutor.cpp.

References ResultSet::executor_, and anonymous_namespace{RelAlgExecutor.cpp}::get_physical_inputs().

382  {
383  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
384  return executor_->computeStringDictionaryGenerations(phys_inputs);
385 }
const Catalog_Namespace::Catalog & cat_
const RelAlgNode & getRootRelAlgNode() const
std::unordered_set< PhysicalInput > get_physical_inputs(const Catalog_Namespace::Catalog &cat, const RelAlgNode *ra)
Executor * executor_
+ Here is the call graph for this function:

◆ computeTableGenerations()

TableGenerations RelAlgExecutor::computeTableGenerations ( )

Definition at line 387 of file RelAlgExecutor.cpp.

References ResultSet::executor_, and get_physical_table_inputs().

387  {
388  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
389  return executor_->computeTableGenerations(phys_table_ids);
390 }
const RelAlgNode & getRootRelAlgNode() const
Executor * executor_
std::unordered_set< int > get_physical_table_inputs(const RelAlgNode *ra)
+ Here is the call graph for this function:

◆ computeWindow()

void RelAlgExecutor::computeWindow ( const RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co,
const ExecutionOptions eo,
ColumnCacheMap column_cache_map,
const int64_t  queue_time_ms 
)
private

Definition at line 1722 of file RelAlgExecutor.cpp.

References CHECK_EQ, WindowProjectNodeContext::create(), ResultSet::executor_, ExecutionOptions::executor_type, Extern, get_table_infos(), Analyzer::WindowFunction::getPartitionKeys(), RelAlgExecutionUnit::input_descs, kBOOLEAN, kBW_EQ, kONE, RelAlgExecutionUnit::target_exprs, and anonymous_namespace{RelAlgExecutor.cpp}::transform_to_inner().

1726  {
1727  auto query_infos = get_table_infos(ra_exe_unit.input_descs, executor_);
1728  CHECK_EQ(query_infos.size(), size_t(1));
1729  if (query_infos.front().info.fragments.size() != 1) {
1730  throw std::runtime_error(
1731  "Only single fragment tables supported for window functions for now");
1732  }
1733  if (eo.executor_type == ::ExecutorType::Extern) {
1734  return;
1735  }
1736  query_infos.push_back(query_infos.front());
1737  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
1738  for (size_t target_index = 0; target_index < ra_exe_unit.target_exprs.size();
1739  ++target_index) {
1740  const auto& target_expr = ra_exe_unit.target_exprs[target_index];
1741  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
1742  if (!window_func) {
1743  continue;
1744  }
1745  // Always use baseline layout hash tables for now, make the expression a tuple.
1746  const auto& partition_keys = window_func->getPartitionKeys();
1747  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
1748  if (partition_keys.size() > 1) {
1749  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
1750  } else {
1751  if (partition_keys.empty()) {
1752  throw std::runtime_error(
1753  "Empty window function partitions are not supported yet");
1754  }
1755  CHECK_EQ(partition_keys.size(), size_t(1));
1756  partition_key_tuple = partition_keys.front();
1757  }
1758  // Creates a tautology equality with the partition expression on both sides.
1759  const auto partition_key_cond =
1760  makeExpr<Analyzer::BinOper>(kBOOLEAN,
1761  kBW_EQ,
1762  kONE,
1763  partition_key_tuple,
1764  transform_to_inner(partition_key_tuple.get()));
1765  auto context = createWindowFunctionContext(window_func,
1766  partition_key_cond,
1767  ra_exe_unit,
1768  query_infos,
1769  co,
1770  column_cache_map,
1771  executor_->getRowSetMemoryOwner());
1772  context->compute();
1773  window_project_node_context->addWindowFunctionContext(std::move(context),
1774  target_index);
1775  }
1776 }
std::vector< Analyzer::Expr * > target_exprs
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< InputDescriptor > input_descs
static WindowProjectNodeContext * create(Executor *executor)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
ExecutorType executor_type
Definition: sqldefs.h:69
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:1451
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqldefs.h:31
Executor * executor_
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
+ Here is the call graph for this function:

◆ createAggregateWorkUnit()

RelAlgExecutor::WorkUnit RelAlgExecutor::createAggregateWorkUnit ( const RelAggregate aggregate,
const SortInfo sort_info,
const bool  just_explain 
)
private

Definition at line 3552 of file RelAlgExecutor.cpp.

References CHECK_EQ, ResultSet::executor_, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelAlgNode::inputCount(), RelAlgNode::setOutputMetainfo(), anonymous_namespace{RelAlgExecutor.cpp}::synthesize_inputs(), anonymous_namespace{RelAlgExecutor.cpp}::translate_groupby_exprs(), and anonymous_namespace{RelAlgExecutor.cpp}::translate_targets().

3555  {
3556  std::vector<InputDescriptor> input_descs;
3557  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3558  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3559  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
3560  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3561  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
3562  const auto join_type = get_join_type(aggregate);
3563 
3564  RelAlgTranslator translator(cat_,
3565  query_state_,
3566  executor_,
3567  input_to_nest_level,
3568  {join_type},
3569  now_,
3570  just_explain);
3571  CHECK_EQ(size_t(1), aggregate->inputCount());
3572  const auto source = aggregate->getInput(0);
3573  const auto& in_metainfo = source->getOutputMetainfo();
3574  const auto scalar_sources =
3575  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
3576  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
3577  const auto target_exprs = translate_targets(
3578  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
3579  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
3580  aggregate->setOutputMetainfo(targets_meta);
3581  return {RelAlgExecutionUnit{input_descs,
3582  input_col_descs,
3583  {},
3584  {},
3585  {},
3586  groupby_exprs,
3587  target_exprs,
3588  nullptr,
3589  sort_info,
3590  0,
3591  false,
3592  std::nullopt,
3593  query_state_},
3594  aggregate,
3596  nullptr};
3597 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelAggregate *aggregate, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
static const size_t max_groups_buffer_entry_default_guess
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
const Catalog_Namespace::Catalog & cat_
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RelFilter *filter, const std::vector< Analyzer::Expr *> &target_exprs)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
std::shared_ptr< const query_state::QueryState > query_state_
const size_t inputCount() const
const RelAlgNode * getInput(const size_t idx) const
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
Executor * executor_
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelAggregate *aggregate, const RelAlgTranslator &translator)
+ Here is the call graph for this function:

◆ createCompoundWorkUnit()

RelAlgExecutor::WorkUnit RelAlgExecutor::createCompoundWorkUnit ( const RelCompound compound,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 3278 of file RelAlgExecutor.cpp.

References CHECK_EQ, anonymous_namespace{RelAlgExecutor.cpp}::do_table_reordering(), ResultSet::executor_, ExecutionOptions::executor_type, g_from_table_reordering, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_left_deep_join_input_sizes(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), RelAlgNode::inputCount(), ExecutionOptions::just_explain, LEFT, anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), shared::printContainer(), anonymous_namespace{RelAlgExecutor.cpp}::rewrite_quals(), RelAlgNode::setOutputMetainfo(), RelAlgExecutionUnit::simple_quals, RelCompound::size(), anonymous_namespace{RelAlgExecutor.cpp}::translate_groupby_exprs(), anonymous_namespace{RelAlgExecutor.cpp}::translate_quals(), anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), anonymous_namespace{RelAlgExecutor.cpp}::translate_targets(), and VLOG.

3281  {
3282  std::vector<InputDescriptor> input_descs;
3283  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3284  auto input_to_nest_level = get_input_nest_levels(compound, {});
3285  std::tie(input_descs, input_col_descs, std::ignore) =
3286  get_input_desc(compound, input_to_nest_level, {}, cat_);
3287  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
3288  const auto query_infos = get_table_infos(input_descs, executor_);
3289  CHECK_EQ(size_t(1), compound->inputCount());
3290  const auto left_deep_join =
3291  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
3292  JoinQualsPerNestingLevel left_deep_join_quals;
3293  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3294  : std::vector<JoinType>{get_join_type(compound)};
3295  std::vector<size_t> input_permutation;
3296  std::vector<size_t> left_deep_join_input_sizes;
3297  if (left_deep_join) {
3298  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3299  left_deep_join_quals = translateLeftDeepJoinFilter(
3300  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3302  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
3303  join_types.end()) {
3304  input_permutation = do_table_reordering(input_descs,
3305  input_col_descs,
3306  left_deep_join_quals,
3307  input_to_nest_level,
3308  compound,
3309  query_infos,
3310  executor_);
3311  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
3312  std::tie(input_descs, input_col_descs, std::ignore) =
3313  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
3314  left_deep_join_quals = translateLeftDeepJoinFilter(
3315  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3316  }
3317  }
3318  RelAlgTranslator translator(cat_,
3319  query_state_,
3320  executor_,
3321  input_to_nest_level,
3322  join_types,
3323  now_,
3324  eo.just_explain);
3325  const auto scalar_sources =
3326  translate_scalar_sources(compound, translator, eo.executor_type);
3327  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
3328  const auto quals_cf = translate_quals(compound, translator);
3329  const auto target_exprs = translate_targets(target_exprs_owned_,
3330  scalar_sources,
3331  groupby_exprs,
3332  compound,
3333  translator,
3334  eo.executor_type);
3335  CHECK_EQ(compound->size(), target_exprs.size());
3336  const RelAlgExecutionUnit exe_unit = {input_descs,
3337  input_col_descs,
3338  quals_cf.simple_quals,
3339  rewrite_quals(quals_cf.quals),
3340  left_deep_join_quals,
3341  groupby_exprs,
3342  target_exprs,
3343  nullptr,
3344  sort_info,
3345  0,
3346  false,
3347  std::nullopt,
3348  query_state_};
3349  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3350  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3351  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
3352  compound->setOutputMetainfo(targets_meta);
3353  return {rewritten_exe_unit,
3354  compound,
3356  std::move(query_rewriter),
3357  input_permutation,
3358  left_deep_join_input_sizes};
3359 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelAggregate *aggregate, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:107
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
size_t size() const override
static const size_t max_groups_buffer_entry_default_guess
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
bool g_from_table_reordering
Definition: Execute.cpp:82
ExecutorType executor_type
const Catalog_Namespace::Catalog & cat_
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RelFilter *filter, const std::vector< Analyzer::Expr *> &target_exprs)
std::shared_ptr< const query_state::QueryState > query_state_
const size_t inputCount() const
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
const RelAlgNode * getInput(const size_t idx) const
QualsConjunctiveForm translate_quals(const RelCompound *compound, const RelAlgTranslator &translator)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
Executor * executor_
#define VLOG(n)
Definition: Logger.h:291
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelAggregate *aggregate, const RelAlgTranslator &translator)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)
std::list< std::shared_ptr< Analyzer::Expr > > rewrite_quals(const std::list< std::shared_ptr< Analyzer::Expr >> &quals)
+ Here is the call graph for this function:

◆ createFilterWorkUnit()

RelAlgExecutor::WorkUnit RelAlgExecutor::createFilterWorkUnit ( const RelFilter filter,
const SortInfo sort_info,
const bool  just_explain 
)
private

Definition at line 3906 of file RelAlgExecutor.cpp.

References CHECK_EQ, ResultSet::executor_, fold_expr(), get_exprs_not_owned(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_inputs_meta(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), RelFilter::getCondition(), RelAlgNode::inputCount(), rewrite_expr(), RelAlgNode::setOutputMetainfo(), and speculative_topn_blacklist_.

3908  {
3909  CHECK_EQ(size_t(1), filter->inputCount());
3910  std::vector<InputDescriptor> input_descs;
3911  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3912  std::vector<TargetMetaInfo> in_metainfo;
3913  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3914  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
3915 
3916  const auto input_to_nest_level = get_input_nest_levels(filter, {});
3917  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3918  get_input_desc(filter, input_to_nest_level, {}, cat_);
3919  const auto join_type = get_join_type(filter);
3920  RelAlgTranslator translator(cat_,
3921  query_state_,
3922  executor_,
3923  input_to_nest_level,
3924  {join_type},
3925  now_,
3926  just_explain);
3927  std::tie(in_metainfo, target_exprs_owned) =
3928  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
3929  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
3930  const auto qual = fold_expr(filter_expr.get());
3931  target_exprs_owned_.insert(
3932  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3933  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3934  filter->setOutputMetainfo(in_metainfo);
3935  const auto rewritten_qual = rewrite_expr(qual.get());
3936  return {{input_descs,
3937  input_col_descs,
3938  {},
3939  {rewritten_qual ? rewritten_qual : qual},
3940  {},
3941  {nullptr},
3942  target_exprs,
3943  nullptr,
3944  sort_info,
3945  0},
3946  filter,
3948  nullptr};
3949 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
static const size_t max_groups_buffer_entry_default_guess
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:226
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
const Catalog_Namespace::Catalog & cat_
JoinType get_join_type(const RelAlgNode *ra)
const RexScalar * getCondition() const
std::shared_ptr< const query_state::QueryState > query_state_
const size_t inputCount() const
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
Executor * executor_
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)
+ Here is the call graph for this function:

◆ createJoinWorkUnit()

WorkUnit RelAlgExecutor::createJoinWorkUnit ( const RelJoin ,
const SortInfo ,
const bool  just_explain 
)
private

◆ createProjectWorkUnit()

RelAlgExecutor::WorkUnit RelAlgExecutor::createProjectWorkUnit ( const RelProject project,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 3599 of file RelAlgExecutor.cpp.

References anonymous_namespace{RelAlgExecutor.cpp}::do_table_reordering(), ResultSet::executor_, ExecutionOptions::executor_type, g_from_table_reordering, get_exprs_not_owned(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_left_deep_join_input_sizes(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), ExecutionOptions::just_explain, anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), RelAlgNode::setOutputMetainfo(), and anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources().

3602  {
3603  std::vector<InputDescriptor> input_descs;
3604  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3605  auto input_to_nest_level = get_input_nest_levels(project, {});
3606  std::tie(input_descs, input_col_descs, std::ignore) =
3607  get_input_desc(project, input_to_nest_level, {}, cat_);
3608  const auto query_infos = get_table_infos(input_descs, executor_);
3609 
3610  const auto left_deep_join =
3611  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
3612  JoinQualsPerNestingLevel left_deep_join_quals;
3613  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3614  : std::vector<JoinType>{get_join_type(project)};
3615  std::vector<size_t> input_permutation;
3616  std::vector<size_t> left_deep_join_input_sizes;
3617  if (left_deep_join) {
3618  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3619  const auto query_infos = get_table_infos(input_descs, executor_);
3620  left_deep_join_quals = translateLeftDeepJoinFilter(
3621  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3623  input_permutation = do_table_reordering(input_descs,
3624  input_col_descs,
3625  left_deep_join_quals,
3626  input_to_nest_level,
3627  project,
3628  query_infos,
3629  executor_);
3630  input_to_nest_level = get_input_nest_levels(project, input_permutation);
3631  std::tie(input_descs, input_col_descs, std::ignore) =
3632  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
3633  left_deep_join_quals = translateLeftDeepJoinFilter(
3634  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3635  }
3636  }
3637 
3638  RelAlgTranslator translator(cat_,
3639  query_state_,
3640  executor_,
3641  input_to_nest_level,
3642  join_types,
3643  now_,
3644  eo.just_explain);
3645  const auto target_exprs_owned =
3646  translate_scalar_sources(project, translator, eo.executor_type);
3647  target_exprs_owned_.insert(
3648  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3649  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3650 
3651  const RelAlgExecutionUnit exe_unit = {input_descs,
3652  input_col_descs,
3653  {},
3654  {},
3655  left_deep_join_quals,
3656  {nullptr},
3657  target_exprs,
3658  nullptr,
3659  sort_info,
3660  0,
3661  false,
3662  std::nullopt,
3663  query_state_};
3664  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3665  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3666  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
3667  project->setOutputMetainfo(targets_meta);
3668  return {rewritten_exe_unit,
3669  project,
3671  std::move(query_rewriter),
3672  input_permutation,
3673  left_deep_join_input_sizes};
3674 }
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:107
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
static const size_t max_groups_buffer_entry_default_guess
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:226
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
bool g_from_table_reordering
Definition: Execute.cpp:82
ExecutorType executor_type
const Catalog_Namespace::Catalog & cat_
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RelFilter *filter, const std::vector< Analyzer::Expr *> &target_exprs)
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
const RelAlgNode * getInput(const size_t idx) const
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
Executor * executor_
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)
+ Here is the call graph for this function:

◆ createSortInputWorkUnit()

RelAlgExecutor::WorkUnit RelAlgExecutor::createSortInputWorkUnit ( const RelSort sort,
const ExecutionOptions eo 
)
private

Definition at line 2484 of file RelAlgExecutor.cpp.

References CHECK_GT, RelSort::collationCount(), Default, anonymous_namespace{RelAlgExecutor.cpp}::first_oe_is_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_order_entries(), anonymous_namespace{RelAlgExecutor.cpp}::get_scan_limit(), get_target_info(), RelAlgNode::getInput(), RelSort::getLimit(), RelSort::getOffset(), RelAlgExecutionUnit::input_descs, RelAlgNode::setOutputMetainfo(), SpeculativeTopN, and StreamingTopN.

2486  {
2487  const auto source = sort->getInput(0);
2488  const size_t limit = sort->getLimit();
2489  const size_t offset = sort->getOffset();
2490  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
2491  const size_t scan_total_limit =
2492  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
2493  size_t max_groups_buffer_entry_guess{
2494  scan_total_limit ? scan_total_limit : max_groups_buffer_entry_default_guess};
2496  const auto order_entries = get_order_entries(sort);
2497  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
2498  auto source_work_unit = createWorkUnit(source, sort_info, eo);
2499  const auto& source_exe_unit = source_work_unit.exe_unit;
2500 
2501  // we do not allow sorting geometry or array types
2502  for (auto order_entry : order_entries) {
2503  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
2504  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
2505  const auto& ti = get_target_info(te, false);
2506  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
2507  throw std::runtime_error(
2508  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
2509  }
2510  }
2511 
2512  if (source_exe_unit.groupby_exprs.size() == 1) {
2513  if (!source_exe_unit.groupby_exprs.front()) {
2514  sort_algorithm = SortAlgorithm::StreamingTopN;
2515  } else {
2516  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
2517  first_oe_is_desc(order_entries))) {
2518  sort_algorithm = SortAlgorithm::Default;
2519  }
2520  }
2521  }
2522 
2523  sort->setOutputMetainfo(source->getOutputMetainfo());
2524  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
2525  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
2526  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
2527  std::move(source_exe_unit.input_col_descs),
2528  source_exe_unit.simple_quals,
2529  source_exe_unit.quals,
2530  source_exe_unit.join_quals,
2531  source_exe_unit.groupby_exprs,
2532  source_exe_unit.target_exprs,
2533  nullptr,
2534  {sort_info.order_entries, sort_algorithm, limit, offset},
2535  scan_total_limit,
2536  source_exe_unit.use_bump_allocator,
2537  source_exe_unit.union_all,
2538  source_exe_unit.query_state},
2539  source,
2540  max_groups_buffer_entry_guess,
2541  std::move(source_work_unit.query_rewriter),
2542  source_work_unit.input_permutation,
2543  source_work_unit.left_deep_join_input_sizes};
2544 }
size_t collationCount() const
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:78
static SpeculativeTopNBlacklist speculative_topn_blacklist_
static const size_t max_groups_buffer_entry_default_guess
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
std::vector< InputDescriptor > input_descs
#define CHECK_GT(x, y)
Definition: Logger.h:209
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
SortAlgorithm
size_t getOffset() const
size_t getLimit() const
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
const RelAlgNode * getInput(const size_t idx) const
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)
+ Here is the call graph for this function:

◆ createTableFunctionWorkUnit()

RelAlgExecutor::TableFunctionWorkUnit RelAlgExecutor::createTableFunctionWorkUnit ( const RelTableFunction table_func,
const bool  just_explain,
const bool  is_gpu 
)
private

Definition at line 3788 of file RelAlgExecutor.cpp.

References bind_table_function(), CHECK_EQ, CHECK_GT, ResultSet::executor_, get_exprs_not_owned(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelTableFunction::getColInputsSize(), RelTableFunction::getFunctionName(), table_functions::TableFunction::getOutputRowSizeParameter(), RelTableFunction::getTableFuncInputAt(), Native, RelAlgNode::setOutputMetainfo(), TableFunctionExecutionUnit::target_exprs, to_string(), anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), and UNREACHABLE.

3791  {
3792  std::vector<InputDescriptor> input_descs;
3793  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3794  auto input_to_nest_level = get_input_nest_levels(table_func, {});
3795  std::tie(input_descs, input_col_descs, std::ignore) =
3796  get_input_desc(table_func, input_to_nest_level, {}, cat_);
3797  const auto query_infos = get_table_infos(input_descs, executor_);
3798  RelAlgTranslator translator(
3799  cat_, query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
3800  const auto input_exprs_owned =
3801  translate_scalar_sources(table_func, translator, ::ExecutorType::Native);
3802  target_exprs_owned_.insert(
3803  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
3804  const auto input_exprs = get_exprs_not_owned(input_exprs_owned);
3805 
3806  const auto table_function_impl =
3807  bind_table_function(table_func->getFunctionName(), input_exprs_owned, is_gpu);
3808 
3809  size_t output_row_sizing_param = 0;
3810  if (table_function_impl.hasUserSpecifiedOutputSizeMultiplier() ||
3811  table_function_impl.hasUserSpecifiedOutputSizeConstant()) {
3812  const auto parameter_index = table_function_impl.getOutputRowSizeParameter();
3813  CHECK_GT(parameter_index, size_t(0));
3814  const auto parameter_expr = table_func->getTableFuncInputAt(parameter_index - 1);
3815  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
3816  if (!parameter_expr_literal) {
3817  throw std::runtime_error(
3818  "Provided output buffer sizing parameter is not a literal. Only literal "
3819  "values are supported with output buffer sizing configured table "
3820  "functions.");
3821  }
3822  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
3823  if (literal_val < 0) {
3824  throw std::runtime_error("Provided output sizing parameter " +
3825  std::to_string(literal_val) +
3826  " must be positive integer.");
3827  }
3828  output_row_sizing_param = static_cast<size_t>(literal_val);
3829  } else if (table_function_impl.hasNonUserSpecifiedOutputSizeConstant()) {
3830  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
3831  } else {
3832  UNREACHABLE();
3833  }
3834 
3835  std::vector<Analyzer::ColumnVar*> input_col_exprs;
3836  size_t index = 0;
3837  for (auto input_expr : input_exprs) {
3838  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
3839  input_expr->set_type_info(table_function_impl.getInputSQLType(index));
3840  input_col_exprs.push_back(col_var);
3841  }
3842  index++;
3843  }
3844  CHECK_EQ(input_col_exprs.size(), table_func->getColInputsSize());
3845  std::vector<Analyzer::Expr*> table_func_outputs;
3846  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
3847  const auto ti = table_function_impl.getOutputSQLType(i);
3848  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
3849  table_func_outputs.push_back(target_exprs_owned_.back().get());
3850  }
3851  const TableFunctionExecutionUnit exe_unit = {
3852  input_descs,
3853  input_col_descs,
3854  input_exprs, // table function inputs
3855  input_col_exprs, // table function column inputs (duplicates w/ above)
3856  table_func_outputs, // table function projected exprs
3857  output_row_sizing_param, // output buffer sizing param
3858  table_function_impl};
3859  const auto targets_meta = get_targets_meta(table_func, exe_unit.target_exprs);
3860  table_func->setOutputMetainfo(targets_meta);
3861  return {exe_unit, table_func};
3862 }
std::string getFunctionName() const
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
size_t getColInputsSize() const
#define UNREACHABLE()
Definition: Logger.h:241
const RexScalar * getTableFuncInputAt(const size_t idx) const
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:226
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::string to_string(char const *&&v)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
const Catalog_Namespace::Catalog & cat_
const table_functions::TableFunction bind_table_function(std::string name, Analyzer::ExpressionPtrVector input_args, const std::vector< table_functions::TableFunction > &table_funcs)
std::vector< TargetMetaInfo > get_targets_meta(const RelFilter *filter, const std::vector< Analyzer::Expr *> &target_exprs)
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
std::vector< Analyzer::Expr * > target_exprs
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
Executor * executor_
+ Here is the call graph for this function:

◆ createUnionWorkUnit()

RelAlgExecutor::WorkUnit RelAlgExecutor::createUnionWorkUnit ( const RelLogicalUnion logical_union,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 3694 of file RelAlgExecutor.cpp.

References CHECK, ResultSet::executor_, get_exprs_not_owned(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelLogicalUnion::isAll(), ExecutionOptions::just_explain, shared::printContainer(), RelAlgNode::setOutputMetainfo(), anonymous_namespace{RelAlgExecutor.cpp}::target_exprs_for_union(), RelAlgNode::toString(), and VLOG.

3697  {
3698  std::vector<InputDescriptor> input_descs;
3699  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3700  // Map ra input ptr to index (0, 1).
3701  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
3702  std::tie(input_descs, input_col_descs, std::ignore) =
3703  get_input_desc(logical_union, input_to_nest_level, {}, cat_);
3704  const auto query_infos = get_table_infos(input_descs, executor_);
3705  auto const max_num_tuples =
3706  std::accumulate(query_infos.cbegin(),
3707  query_infos.cend(),
3708  size_t(0),
3709  [](auto max, auto const& query_info) {
3710  return std::max(max, query_info.info.getNumTuples());
3711  });
3712 
3713  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
3714  for (auto& pair : input_to_nest_level) {
3715  VLOG(3) << " (" << pair.first->toString() << ", " << pair.second << ')';
3716  }
3717 
3718  RelAlgTranslator translator(
3719  cat_, query_state_, executor_, input_to_nest_level, {}, now_, eo.just_explain);
3720 
3721  auto const input_exprs_owned = target_exprs_for_union(logical_union->getInput(0));
3722  CHECK(!input_exprs_owned.empty())
3723  << "No metainfo found for input node " << logical_union->getInput(0)->toString();
3724  VLOG(3) << "input_exprs_owned.size()=" << input_exprs_owned.size();
3725  for (auto& input_expr : input_exprs_owned) {
3726  VLOG(3) << " " << input_expr->toString();
3727  }
3728  target_exprs_owned_.insert(
3729  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
3730  const auto target_exprs = get_exprs_not_owned(input_exprs_owned);
3731 
3732  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
3733  << " input_col_descs=" << shared::printContainer(input_col_descs)
3734  << " target_exprs.size()=" << target_exprs.size()
3735  << " max_num_tuples=" << max_num_tuples;
3736 
3737  const RelAlgExecutionUnit exe_unit = {input_descs,
3738  input_col_descs,
3739  {}, // quals_cf.simple_quals,
3740  {}, // rewrite_quals(quals_cf.quals),
3741  {},
3742  {nullptr},
3743  target_exprs,
3744  nullptr,
3745  sort_info,
3746  max_num_tuples,
3747  false,
3748  logical_union->isAll(),
3749  query_state_};
3750  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3751  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3752 
3753  RelAlgNode const* input0 = logical_union->getInput(0);
3754  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
3755  logical_union->setOutputMetainfo(
3756  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3757  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
3758  logical_union->setOutputMetainfo(
3759  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3760  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
3761  logical_union->setOutputMetainfo(
3762  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3763  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
3764  logical_union->setOutputMetainfo(
3765  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3766  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
3767  logical_union->setOutputMetainfo(
3768  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3769  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
3770  logical_union->setOutputMetainfo(
3771  get_targets_meta(node, rewritten_exe_unit.target_exprs));
3772  } else if (dynamic_cast<const RelSort*>(input0)) {
3773  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
3774  } else {
3775  throw QueryNotSupported("Unsupported input type: " + input0->toString());
3776  }
3777  VLOG(3) << "logical_union->getOutputMetainfo()="
3778  << shared::printContainer(logical_union->getOutputMetainfo())
3779  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
3780  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
3781 
3782  return {rewritten_exe_unit,
3783  logical_union,
3785  std::move(query_rewriter)};
3786 }
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
static const size_t max_groups_buffer_entry_default_guess
bool isAll() const
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:226
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
const Catalog_Namespace::Catalog & cat_
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_for_union(RelAlgNode const *input_node)
std::vector< TargetMetaInfo > get_targets_meta(const RelFilter *filter, const std::vector< Analyzer::Expr *> &target_exprs)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
std::shared_ptr< const query_state::QueryState > query_state_
virtual std::string toString() const =0
#define CHECK(condition)
Definition: Logger.h:197
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
const RelAlgNode * getInput(const size_t idx) const
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
Executor * executor_
#define VLOG(n)
Definition: Logger.h:291
+ Here is the call graph for this function:

◆ createWindowFunctionContext()

std::unique_ptr< WindowFunctionContext > RelAlgExecutor::createWindowFunctionContext ( const Analyzer::WindowFunction window_func,
const std::shared_ptr< Analyzer::BinOper > &  partition_key_cond,
const RelAlgExecutionUnit ra_exe_unit,
const std::vector< InputTableInfo > &  query_infos,
const CompilationOptions co,
ColumnCacheMap column_cache_map,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner 
)
private

Definition at line 1778 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, Data_Namespace::CPU_LEVEL, CompilationOptions::device_type, ResultSet::executor_, ColumnFetcher::getOneColumnFragment(), Analyzer::WindowFunction::getOrderKeys(), GPU, Data_Namespace::GPU_LEVEL, and JoinHashTableInterface::OneToMany.

1785  {
1786  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
1789  const auto join_table_or_err =
1790  executor_->buildHashTableForQualifier(partition_key_cond,
1791  query_infos,
1792  memory_level,
1794  column_cache_map);
1795  if (!join_table_or_err.fail_reason.empty()) {
1796  throw std::runtime_error(join_table_or_err.fail_reason);
1797  }
1798  CHECK(join_table_or_err.hash_table->getHashType() ==
1800  const auto& order_keys = window_func->getOrderKeys();
1801  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
1802  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
1803  auto context = std::make_unique<WindowFunctionContext>(window_func,
1804  join_table_or_err.hash_table,
1805  elem_count,
1806  co.device_type,
1807  row_set_mem_owner);
1808  for (const auto& order_key : order_keys) {
1809  const auto order_col =
1810  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
1811  if (!order_col) {
1812  throw std::runtime_error("Only order by columns supported for now");
1813  }
1814  const int8_t* column;
1815  size_t join_col_elem_count;
1816  std::tie(column, join_col_elem_count) =
1818  *order_col,
1819  query_infos.front().info.fragments.front(),
1820  memory_level,
1821  0,
1822  nullptr,
1823  chunks_owner,
1824  column_cache_map);
1825  CHECK_EQ(join_col_elem_count, elem_count);
1826  context->addOrderColumn(column, order_col.get(), chunks_owner);
1827  }
1828  return context;
1829 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1455
ExecutorDeviceType device_type
#define CHECK(condition)
Definition: Logger.h:197
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
Executor * executor_
+ Here is the call graph for this function:

◆ createWorkUnit()

RelAlgExecutor::WorkUnit RelAlgExecutor::createWorkUnit ( const RelAlgNode node,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 3106 of file RelAlgExecutor.cpp.

References logger::FATAL, ExecutionOptions::just_explain, LOG, and RelAlgNode::toString().

3108  {
3109  const auto compound = dynamic_cast<const RelCompound*>(node);
3110  if (compound) {
3111  return createCompoundWorkUnit(compound, sort_info, eo);
3112  }
3113  const auto project = dynamic_cast<const RelProject*>(node);
3114  if (project) {
3115  return createProjectWorkUnit(project, sort_info, eo);
3116  }
3117  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
3118  if (aggregate) {
3119  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
3120  }
3121  const auto filter = dynamic_cast<const RelFilter*>(node);
3122  if (filter) {
3123  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
3124  }
3125  LOG(FATAL) << "Unhandled node type: " << node->toString();
3126  return {};
3127 }
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
#define LOG(tag)
Definition: Logger.h:188
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
virtual std::string toString() const =0
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)
+ Here is the call graph for this function:

◆ eraseFromTemporaryTables()

void RelAlgExecutor::eraseFromTemporaryTables ( const int  table_id)
inlineprivate

Definition at line 346 of file RelAlgExecutor.h.

References join().

346 { temporary_tables_.erase(table_id); }
TemporaryTables temporary_tables_
+ Here is the call graph for this function:

◆ executeAggregate()

ExecutionResult RelAlgExecutor::executeAggregate ( const RelAggregate aggregate,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 1590 of file RelAlgExecutor.cpp.

References DEBUG_TIMER, Default, RelAlgNode::getOutputMetainfo(), and ExecutionOptions::just_explain.

1594  {
1595  auto timer = DEBUG_TIMER(__func__);
1596  const auto work_unit = createAggregateWorkUnit(
1597  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1598  return executeWorkUnit(work_unit,
1599  aggregate->getOutputMetainfo(),
1600  true,
1601  co,
1602  eo,
1603  render_info,
1604  queue_time_ms);
1605 }
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
#define DEBUG_TIMER(name)
Definition: Logger.h:313
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
+ Here is the call graph for this function:

◆ executeCompound()

ExecutionResult RelAlgExecutor::executeCompound ( const RelCompound compound,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 1572 of file RelAlgExecutor.cpp.

References DEBUG_TIMER, Default, RelAlgNode::getOutputMetainfo(), and RelCompound::isAggregate().

1576  {
1577  auto timer = DEBUG_TIMER(__func__);
1578  const auto work_unit =
1579  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
1580  CompilationOptions co_compound = co;
1581  return executeWorkUnit(work_unit,
1582  compound->getOutputMetainfo(),
1583  compound->isAggregate(),
1584  co_compound,
1585  eo,
1586  render_info,
1587  queue_time_ms);
1588 }
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
bool isAggregate() const
#define DEBUG_TIMER(name)
Definition: Logger.h:313
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
+ Here is the call graph for this function:

◆ executeDelete()

void RelAlgExecutor::executeDelete ( const RelAlgNode node,
const CompilationOptions co,
const ExecutionOptions eo_in,
const int64_t  queue_time_ms 
)
private

Definition at line 1487 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, DEBUG_TIMER, Default, ResultSet::executor_, CompilationOptions::filter_on_deleted_column, get_table_infos(), get_temporary_table(), CacheInvalidator< CACHE_HOLDING_TYPES >::invalidateCaches(), CompilationOptions::makeCpuOnly(), and table_is_temporary().

1490  {
1491  CHECK(node);
1492  auto timer = DEBUG_TIMER(__func__);
1493 
1495 
1496  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
1497  auto& work_unit,
1498  const bool is_aggregate) {
1499  auto* table_descriptor = node->getModifiedTableDescriptor();
1500  CHECK(table_descriptor);
1501  if (!table_descriptor->hasDeletedCol) {
1502  throw std::runtime_error(
1503  "DELETE only supported on tables with the vacuum attribute set to 'delayed'");
1504  }
1505 
1506  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1507 
1508  auto execute_delete_ra_exe_unit =
1509  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
1510  const bool is_aggregate) {
1511  DeleteTransactionParameters delete_params(table_is_temporary(table_descriptor));
1512  auto delete_callback = yieldDeleteCallback(delete_params);
1514 
1515  auto eo = eo_in;
1516  if (delete_params.tableIsTemporary()) {
1517  eo.output_columnar_hint = true;
1518  co_delete.filter_on_deleted_column =
1519  false; // project the entire delete column for columnar update
1520  } else {
1521  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
1522  }
1523 
1524  executor_->executeUpdate(exe_unit,
1525  table_infos,
1526  co_delete,
1527  eo,
1528  cat_,
1529  executor_->row_set_mem_owner_,
1530  delete_callback,
1531  is_aggregate);
1532  delete_params.finalizeTransaction();
1533  };
1534 
1535  if (table_is_temporary(table_descriptor)) {
1536  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1537  auto cd = cat_.getDeletedColumn(table_descriptor);
1538  CHECK(cd);
1539  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
1540  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
1541  const auto rewritten_exe_unit =
1542  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
1543  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1544  } else {
1545  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1546  }
1547  };
1548 
1549  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1550  const auto work_unit =
1551  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1552  execute_delete_for_node(compound, work_unit, compound->isAggregate());
1553  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1554  auto work_unit =
1555  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1556  if (project->isSimple()) {
1557  CHECK_EQ(size_t(1), project->inputCount());
1558  const auto input_ra = project->getInput(0);
1559  if (dynamic_cast<const RelSort*>(input_ra)) {
1560  const auto& input_table =
1561  get_temporary_table(&temporary_tables_, -input_ra->getId());
1562  CHECK(input_table);
1563  work_unit.exe_unit.scan_limit = input_table->rowCount();
1564  }
1565  }
1566  execute_delete_for_node(project, work_unit, false);
1567  } else {
1568  throw std::runtime_error("Unsupported parent node for delete: " + node->toString());
1569  }
1570 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
TemporaryTables temporary_tables_
static void invalidateCaches()
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:191
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
const Catalog_Namespace::Catalog & cat_
bool table_is_temporary(const TableDescriptor *const td)
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:2854
#define CHECK(condition)
Definition: Logger.h:197
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
Executor * executor_
+ Here is the call graph for this function:

◆ executeFilter()

ExecutionResult RelAlgExecutor::executeFilter ( const RelFilter filter,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 1831 of file RelAlgExecutor.cpp.

References DEBUG_TIMER, Default, RelAlgNode::getOutputMetainfo(), and ExecutionOptions::just_explain.

1835  {
1836  auto timer = DEBUG_TIMER(__func__);
1837  const auto work_unit =
1838  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1839  return executeWorkUnit(
1840  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
1841 }
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
#define DEBUG_TIMER(name)
Definition: Logger.h:313
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)
+ Here is the call graph for this function:

◆ executeLogicalValues()

ExecutionResult RelAlgExecutor::executeLogicalValues ( const RelLogicalValues logical_values,
const ExecutionOptions eo 
)
private

Definition at line 1896 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, CHECK_GE, CPU, DEBUG_TIMER, EMPTY_KEY_64, ResultSet::executor_, RelLogicalValues::getNumRows(), RelLogicalValues::getRowsSize(), RelLogicalValues::getTupleType(), RelLogicalValues::getValueAt(), RelLogicalValues::hasRows(), inline_int_null_val(), ExecutionOptions::just_explain, kBIGINT, kCOUNT, kNULLT, Projection, RelAlgNode::setOutputMetainfo(), RelLogicalValues::size(), sz, and RelAlgTranslator::translateLiteral().

1898  {
1899  auto timer = DEBUG_TIMER(__func__);
1900  if (eo.just_explain) {
1901  throw std::runtime_error("EXPLAIN not supported for LogicalValues");
1902  }
1903 
1904  QueryMemoryDescriptor query_mem_desc(executor_,
1905  logical_values->getNumRows(),
1907  /*is_table_function=*/false);
1908 
1909  auto tuple_type = logical_values->getTupleType();
1910  for (size_t i = 0; i < tuple_type.size(); ++i) {
1911  auto& target_meta_info = tuple_type[i];
1912  if (target_meta_info.get_type_info().is_varlen()) {
1913  throw std::runtime_error("Variable length types not supported in VALUES yet.");
1914  }
1915  if (target_meta_info.get_type_info().get_type() == kNULLT) {
1916  // replace w/ bigint
1917  tuple_type[i] =
1918  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
1919  }
1920  query_mem_desc.addColSlotInfo(
1921  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
1922  }
1923  logical_values->setOutputMetainfo(tuple_type);
1924 
1925  std::vector<TargetInfo> target_infos;
1926  for (const auto& tuple_type_component : tuple_type) {
1927  target_infos.emplace_back(TargetInfo{false,
1928  kCOUNT,
1929  tuple_type_component.get_type_info(),
1930  SQLTypeInfo(kNULLT, false),
1931  false,
1932  false});
1933  }
1934  auto rs = std::make_shared<ResultSet>(target_infos,
1936  query_mem_desc,
1937  executor_->getRowSetMemoryOwner(),
1938  executor_);
1939 
1940  if (logical_values->hasRows()) {
1941  CHECK_EQ(logical_values->getRowsSize(), logical_values->size());
1942 
1943  auto storage = rs->allocateStorage();
1944  auto buff = storage->getUnderlyingBuffer();
1945 
1946  for (size_t i = 0; i < logical_values->getNumRows(); i++) {
1947  std::vector<std::shared_ptr<Analyzer::Expr>> row_literals;
1948  int8_t* ptr = buff + i * query_mem_desc.getRowSize();
1949 
1950  for (size_t j = 0; j < logical_values->getRowsSize(); j++) {
1951  auto rex_literal =
1952  dynamic_cast<const RexLiteral*>(logical_values->getValueAt(i, j));
1953  CHECK(rex_literal);
1954  const auto expr = RelAlgTranslator::translateLiteral(rex_literal);
1955  const auto constant = std::dynamic_pointer_cast<Analyzer::Constant>(expr);
1956  CHECK(constant);
1957 
1958  if (constant->get_is_null()) {
1959  CHECK(!target_infos[j].sql_type.is_varlen());
1960  *reinterpret_cast<int64_t*>(ptr) =
1961  inline_int_null_val(target_infos[j].sql_type);
1962  } else {
1963  const auto ti = constant->get_type_info();
1964  const auto datum = constant->get_constval();
1965 
1966  // Initialize the entire 8-byte slot
1967  *reinterpret_cast<int64_t*>(ptr) = EMPTY_KEY_64;
1968 
1969  const auto sz = ti.get_size();
1970  CHECK_GE(sz, int(0));
1971  std::memcpy(ptr, &datum, sz);
1972  }
1973  ptr += 8;
1974  }
1975  }
1976  }
1977  return {rs, tuple_type};
1978 }
const std::vector< TargetMetaInfo > getTupleType() const
#define CHECK_EQ(x, y)
Definition: Logger.h:205
#define EMPTY_KEY_64
size_t size() const override
#define CHECK_GE(x, y)
Definition: Logger.h:210
int64_t const int32_t sz
static std::shared_ptr< Analyzer::Expr > translateLiteral(const RexLiteral *)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
bool hasRows() const
Definition: sqldefs.h:76
const RexScalar * getValueAt(const size_t row_idx, const size_t col_idx) const
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
size_t getRowsSize() const
Executor * executor_
size_t getNumRows() const
+ Here is the call graph for this function:

◆ executeModify()

ExecutionResult RelAlgExecutor::executeModify ( const RelModify modify,
const ExecutionOptions eo 
)
private

Definition at line 2028 of file RelAlgExecutor.cpp.

References CPU, DEBUG_TIMER, ResultSet::executor_, and ExecutionOptions::just_explain.

2029  {
2030  auto timer = DEBUG_TIMER(__func__);
2031  if (eo.just_explain) {
2032  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2033  }
2034 
2035  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2038  executor_->getRowSetMemoryOwner(),
2039  executor_);
2040 
2041  std::vector<TargetMetaInfo> empty_targets;
2042  return {rs, empty_targets};
2043 }
std::vector< TargetInfo > TargetInfoList
#define DEBUG_TIMER(name)
Definition: Logger.h:313
Executor * executor_

◆ executeProject()

ExecutionResult RelAlgExecutor::executeProject ( const RelProject project,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const std::optional< size_t >  previous_count 
)
private

Definition at line 1620 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, CPU, DEBUG_TIMER, Default, get_temporary_table(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelAlgNode::inputCount(), and RelProject::isSimple().

1626  {
1627  auto timer = DEBUG_TIMER(__func__);
1628  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
1629  CompilationOptions co_project = co;
1630  if (project->isSimple()) {
1631  CHECK_EQ(size_t(1), project->inputCount());
1632  const auto input_ra = project->getInput(0);
1633  if (dynamic_cast<const RelSort*>(input_ra)) {
1634  co_project.device_type = ExecutorDeviceType::CPU;
1635  const auto& input_table =
1636  get_temporary_table(&temporary_tables_, -input_ra->getId());
1637  CHECK(input_table);
1638  work_unit.exe_unit.scan_limit =
1639  std::min(input_table->getLimit(), input_table->rowCount());
1640  }
1641  }
1642  return executeWorkUnit(work_unit,
1643  project->getOutputMetainfo(),
1644  false,
1645  co_project,
1646  eo,
1647  render_info,
1648  queue_time_ms,
1649  previous_count);
1650 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
TemporaryTables temporary_tables_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:191
bool isSimple() const
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
const size_t inputCount() const
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
const RelAlgNode * getInput(const size_t idx) const
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
+ Here is the call graph for this function:

◆ executeRelAlgQuery()

ExecutionResult RelAlgExecutor::executeRelAlgQuery ( const CompilationOptions co,
const ExecutionOptions eo,
const bool  just_explain_plan,
RenderInfo render_info 
)

Definition at line 152 of file RelAlgExecutor.cpp.

References CHECK, DEBUG_TIMER, g_allow_cpu_retry, logger::INFO, INJECT_TIMER, LOG, CompilationOptions::makeCpuOnly(), and RenderInfo::setForceNonInSituData().

155  {
156  CHECK(query_dag_);
157  auto timer = DEBUG_TIMER(__func__);
159 
160  try {
161  return executeRelAlgQueryNoRetry(co, eo, just_explain_plan, render_info);
162  } catch (const QueryMustRunOnCpu&) {
163  if (!g_allow_cpu_retry) {
164  throw;
165  }
166  }
167  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
168  auto co_cpu = CompilationOptions::makeCpuOnly(co);
169 
170  if (render_info) {
171  render_info->setForceNonInSituData();
172  }
173  return executeRelAlgQueryNoRetry(co_cpu, eo, just_explain_plan, render_info);
174 }
void setForceNonInSituData()
Definition: RenderInfo.cpp:45
#define LOG(tag)
Definition: Logger.h:188
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
ExecutionResult executeRelAlgQuery(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::unique_ptr< RelAlgDagBuilder > query_dag_
bool g_allow_cpu_retry
Definition: Execute.cpp:79
+ Here is the call graph for this function:

◆ executeRelAlgQueryNoRetry()

ExecutionResult RelAlgExecutor::executeRelAlgQueryNoRetry ( const CompilationOptions co,
const ExecutionOptions eo,
const bool  just_explain_plan,
RenderInfo render_info 
)
private

Definition at line 185 of file RelAlgExecutor.cpp.

References ExecutionOptions::allow_runtime_query_interrupt, CHECK, DEBUG_TIMER, RenderInfo::disallow_in_situ_only_if_final_ED_is_aggregate, executeRelAlgSeq(), ResultSet::executor_, ExecutionOptions::find_push_down_candidates, g_enable_dynamic_watchdog, g_enable_runtime_query_interrupt, g_runtime_query_interrupt_frequency, anonymous_namespace{RelAlgExecutor.cpp}::get_physical_inputs(), get_physical_table_inputs(), INJECT_TIMER, join(), run_benchmark_import::result, RenderInfo::setInSituDataIfUnset(), ResultSet::sort(), timer_start(), timer_stop(), to_string(), Executor::UNITARY_EXECUTOR_ID, and VLOG.

188  {
190  auto timer = DEBUG_TIMER(__func__);
191  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
192 
193  query_dag_->resetQueryExecutionState();
194  const auto& ra = query_dag_->getRootNode();
195 
196  // capture the lock acquistion time
197  auto clock_begin = timer_start();
199  executor_->resetInterrupt();
200  }
201  std::string query_session = "";
202  std::string query_str = "N/A";
204  // a request of query execution without session id can happen, i.e., test query
205  // if so, we turn back to the original way: a runtime query interrupt
206  // without per-session management (as similar to dynamic watchdog)
207  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
208  executor_->executor_session_mutex_);
209  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
210  query_session = query_state_->getConstSessionInfo()->get_session_id();
211  query_str = query_state_->getQueryStr();
212  } else if (executor_->getCurrentQuerySession(session_read_lock) != query_session) {
213  query_session = executor_->getCurrentQuerySession(session_read_lock);
214  }
215 
216  session_read_lock.unlock();
217  if (query_session != "") {
218  // if session is valid, then we allow per-session runtime query interrupt
219  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
220  executor_->executor_session_mutex_);
221  executor_->addToQuerySessionList(query_session, query_str, session_write_lock);
222  session_write_lock.unlock();
223  // hybrid spinlock. if it fails to acquire a lock, then
224  // it sleeps {g_runtime_query_interrupt_frequency} millisecond.
225  while (executor_->execute_spin_lock_.test_and_set(std::memory_order_acquire)) {
226  // failed to get the spinlock: check whether query is interrupted
227  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
228  executor_->executor_session_mutex_);
229  bool isQueryInterrupted =
230  executor_->checkIsQuerySessionInterrupted(query_session, session_read_lock);
231  session_read_lock.unlock();
232  if (isQueryInterrupted) {
233  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
234  executor_->executor_session_mutex_);
235  executor_->removeFromQuerySessionList(query_session, session_write_lock);
236  session_write_lock.unlock();
237  throw std::runtime_error(
238  "Query execution has been interrupted (pending query)");
239  }
240  // here it fails to acquire the lock
241  std::this_thread::sleep_for(
242  std::chrono::milliseconds(g_runtime_query_interrupt_frequency));
243  };
244  }
245  // currently, atomic_flag does not provide a way to get its current status,
246  // i.e., spinlock.is_locked(), so we additionally lock the execute_mutex_
247  // right after acquiring spinlock to let other part of the code can know
248  // whether there exists a running query on the executor
249  }
250  auto aquire_execute_mutex = [](Executor* executor) -> ExecutorMutexHolder {
251  ExecutorMutexHolder ret;
252  if (executor->executor_id_ == Executor::UNITARY_EXECUTOR_ID) {
253  // Only one unitary executor can run at a time
254  ret.unique_lock = mapd_unique_lock<mapd_shared_mutex>(executor->execute_mutex_);
255  } else {
256  ret.shared_lock = mapd_shared_lock<mapd_shared_mutex>(executor->execute_mutex_);
257  }
258  return ret;
259  };
260  auto lock = aquire_execute_mutex(executor_);
261  ScopeGuard clearRuntimeInterruptStatus = [this] {
262  // reset the runtime query interrupt status
264  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
265  executor_->executor_session_mutex_);
266  std::string curSession = executor_->getCurrentQuerySession(session_read_lock);
267  session_read_lock.unlock();
268  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
269  executor_->executor_session_mutex_);
270  executor_->removeFromQuerySessionList(curSession, session_write_lock);
271  executor_->invalidateRunningQuerySession(session_write_lock);
272  executor_->execute_spin_lock_.clear(std::memory_order_release);
273  session_write_lock.unlock();
274  executor_->resetInterrupt();
275  VLOG(1) << "RESET runtime query interrupt status of Executor " << this;
276  }
277  };
278 
280  // check whether this query session is already interrupted
281  // this case occurs when there is very short gap between being interrupted and
282  // taking the execute lock
283  // if so we interrupt the query session and remove it from the running session list
284  mapd_shared_lock<mapd_shared_mutex> session_read_lock(
285  executor_->executor_session_mutex_);
286  bool isAlreadyInterrupted =
287  executor_->checkIsQuerySessionInterrupted(query_session, session_read_lock);
288  session_read_lock.unlock();
289  if (isAlreadyInterrupted) {
290  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
291  executor_->executor_session_mutex_);
292  executor_->removeFromQuerySessionList(query_session, session_write_lock);
293  session_write_lock.unlock();
294  throw std::runtime_error("Query execution has been interrupted");
295  }
296 
297  // make sure to set the running session ID
298  mapd_unique_lock<mapd_shared_mutex> session_write_lock(
299  executor_->executor_session_mutex_);
300  executor_->invalidateRunningQuerySession(session_write_lock);
301  executor_->setCurrentQuerySession(query_session, session_write_lock);
302  session_write_lock.unlock();
303  }
304 
305  int64_t queue_time_ms = timer_stop(clock_begin);
306  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
307  const auto phys_inputs = get_physical_inputs(cat_, &ra);
308  const auto phys_table_ids = get_physical_table_inputs(&ra);
309  executor_->setCatalog(&cat_);
310  executor_->setupCaching(phys_inputs, phys_table_ids);
311 
312  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
313  auto ed_seq = RaExecutionSequence(&ra);
314 
315  if (just_explain_plan) {
316  std::stringstream ss;
317  std::vector<const RelAlgNode*> nodes;
318  for (size_t i = 0; i < ed_seq.size(); i++) {
319  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
320  }
321  size_t ctr = nodes.size();
322  size_t tab_ctr = 0;
323  for (auto& body : boost::adaptors::reverse(nodes)) {
324  const auto index = ctr--;
325  const auto tabs = std::string(tab_ctr++, '\t');
326  CHECK(body);
327  ss << tabs << std::to_string(index) << " : " << body->toString() << "\n";
328  if (auto sort = dynamic_cast<const RelSort*>(body)) {
329  ss << tabs << " : " << sort->getInput(0)->toString() << "\n";
330  }
331  if (dynamic_cast<const RelProject*>(body) ||
332  dynamic_cast<const RelCompound*>(body)) {
333  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
334  ss << tabs << " : " << join->toString() << "\n";
335  }
336  }
337  }
338  auto rs = std::make_shared<ResultSet>(ss.str());
339  return {rs, {}};
340  }
341 
342  if (render_info) {
343  // set render to be non-insitu in certain situations.
345  ed_seq.size() > 1) {
346  // old logic
347  // disallow if more than one ED
348  render_info->setInSituDataIfUnset(false);
349  }
350  }
351 
352  if (eo.find_push_down_candidates) {
353  // this extra logic is mainly due to current limitations on multi-step queries
354  // and/or subqueries.
356  ed_seq, co, eo, render_info, queue_time_ms);
357  }
358  timer_setup.stop();
359 
360  // Dispatch the subqueries first
361  for (auto subquery : getSubqueries()) {
362  const auto subquery_ra = subquery->getRelAlg();
363  CHECK(subquery_ra);
364  if (subquery_ra->hasContextData()) {
365  continue;
366  }
367  // Execute the subquery and cache the result.
368  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
369  RaExecutionSequence subquery_seq(subquery_ra);
370  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
371  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
372  }
373  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
374 }
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
bool setInSituDataIfUnset(const bool is_in_situ_data)
Definition: RenderInfo.cpp:98
std::string join(T const &container, std::string const &delim)
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:75
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
std::string to_string(char const *&&v)
bool disallow_in_situ_only_if_final_ED_is_aggregate
Definition: RenderInfo.h:41
const bool find_push_down_candidates
#define INJECT_TIMER(DESC)
Definition: measure.h:93
A container for relational algebra descriptors defining the execution order for a relational algebra ...
const Catalog_Namespace::Catalog & cat_
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
unsigned g_runtime_query_interrupt_frequency
Definition: Execute.cpp:110
std::shared_ptr< const query_state::QueryState > query_state_
std::unordered_set< PhysicalInput > get_physical_inputs(const Catalog_Namespace::Catalog &cat, const RelAlgNode *ra)
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::unique_ptr< RelAlgDagBuilder > query_dag_
void cleanupPostExecution()
const bool allow_runtime_query_interrupt
Executor * executor_
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:108
std::unordered_set< int > get_physical_table_inputs(const RelAlgNode *ra)
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:42
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:337
+ Here is the call graph for this function:

◆ executeRelAlgQuerySingleStep()

FirstStepExecutionResult RelAlgExecutor::executeRelAlgQuerySingleStep ( const RaExecutionSequence seq,
const size_t  step_idx,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info 
)

Definition at line 414 of file RelAlgExecutor.cpp.

References ExecutionOptions::allow_loop_joins, ExecutionOptions::allow_multifrag, ExecutionOptions::allow_runtime_query_interrupt, CHECK, CHECK_EQ, anonymous_namespace{RelAlgExecutor.cpp}::check_sort_node_source_constraint(), ExecutionOptions::dynamic_watchdog_time_limit, ResultSet::executor_, ExecutionOptions::executor_type, ExecutionOptions::find_push_down_candidates, RaExecutionSequence::getDescriptor(), ExecutionOptions::gpu_input_mem_limit_percent, INJECT_TIMER, ExecutionOptions::jit_debug, ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, anonymous_namespace{RelAlgExecutor.cpp}::node_is_aggregate(), ExecutionOptions::output_columnar_hint, Reduce, ExecutionOptions::runtime_query_interrupt_frequency, GroupByAndAggregate::shard_count_for_top_groups(), ResultSet::sort(), Union, ExecutionOptions::with_dynamic_watchdog, and ExecutionOptions::with_watchdog.

419  {
420  INJECT_TIMER(executeRelAlgQueryStep);
421  auto exe_desc_ptr = seq.getDescriptor(step_idx);
422  CHECK(exe_desc_ptr);
423  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
424 
425  size_t shard_count{0};
426  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
427  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
428  };
429 
430  if (sort) {
432  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
434  source_work_unit.exe_unit, *executor_->getCatalog());
435  if (!shard_count) {
436  // No point in sorting on the leaf, only execute the input to the sort node.
437  CHECK_EQ(size_t(1), sort->inputCount());
438  const auto source = sort->getInput(0);
439  if (sort->collationCount() || node_is_aggregate(source)) {
440  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
441  CHECK_EQ(temp_seq.size(), size_t(1));
442  ExecutionOptions eo_copy = {
444  eo.allow_multifrag,
445  eo.just_explain,
446  eo.allow_loop_joins,
447  eo.with_watchdog,
448  eo.jit_debug,
449  eo.just_validate || sort->isEmptyResult(),
457  eo.executor_type,
458  };
459  // Use subseq to avoid clearing existing temporary tables
460  return {
461  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
462  merge_type(source),
463  source->getId(),
464  false};
465  }
466  }
467  }
468  return {executeRelAlgSubSeq(seq,
469  std::make_pair(step_idx, step_idx + 1),
470  co,
471  eo,
472  render_info,
474  merge_type(exe_desc_ptr->getBody()),
475  exe_desc_ptr->getBody()->getId(),
476  false};
477 }
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:205
const unsigned runtime_query_interrupt_frequency
void check_sort_node_source_constraint(const RelSort *sort)
const bool allow_multifrag
const bool find_push_down_candidates
const bool just_validate
ExecutorType executor_type
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const bool with_dynamic_watchdog
const double gpu_input_mem_limit_percent
MergeType
A container for relational algebra descriptors defining the execution order for a relational algebra ...
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
bool node_is_aggregate(const RelAlgNode *ra)
const bool just_calcite_explain
const bool allow_loop_joins
RaExecutionDesc * getDescriptor(size_t idx) const
#define CHECK(condition)
Definition: Logger.h:197
const bool allow_runtime_query_interrupt
const unsigned dynamic_watchdog_time_limit
Executor * executor_
WorkUnit createSortInputWorkUnit(const RelSort *, const ExecutionOptions &eo)
const bool with_watchdog
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
+ Here is the call graph for this function:

◆ executeRelAlgQueryWithFilterPushDown()

ExecutionResult RelAlgExecutor::executeRelAlgQueryWithFilterPushDown ( const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)

Definition at line 145 of file JoinFilterPushDown.cpp.

References ExecutionOptions::allow_loop_joins, ExecutionOptions::allow_multifrag, ExecutionOptions::allow_runtime_query_interrupt, CHECK, ExecutionOptions::dynamic_watchdog_time_limit, executeRelAlgSeq(), ExecutionOptions::find_push_down_candidates, ExecutionOptions::gpu_input_mem_limit_percent, ExecutionOptions::jit_debug, ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, ExecutionOptions::output_columnar_hint, run_benchmark_import::result, RaExecutionSequence::size(), ExecutionOptions::with_dynamic_watchdog, and ExecutionOptions::with_watchdog.

150  {
151  // we currently do not fully support filter push down with
152  // multi-step execution and/or with subqueries
153  // TODO(Saman): add proper caching to enable filter push down for all cases
154  const auto& subqueries = getSubqueries();
155  if (seq.size() > 1 || !subqueries.empty()) {
156  if (eo.just_calcite_explain) {
157  return ExecutionResult(std::vector<PushedDownFilterInfo>{},
159  }
160  const ExecutionOptions eo_modified{eo.output_columnar_hint,
161  eo.allow_multifrag,
162  eo.just_explain,
163  eo.allow_loop_joins,
164  eo.with_watchdog,
165  eo.jit_debug,
166  eo.just_validate,
169  /*find_push_down_candidates=*/false,
170  /*just_calcite_explain=*/false,
173 
174  // Dispatch the subqueries first
175  for (auto subquery : subqueries) {
176  // Execute the subquery and cache the result.
177  RelAlgExecutor ra_executor(executor_, cat_);
178  const auto subquery_ra = subquery->getRelAlg();
179  CHECK(subquery_ra);
180  RaExecutionSequence subquery_seq(subquery_ra);
181  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
182  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
183  }
184  return executeRelAlgSeq(seq, co, eo_modified, render_info, queue_time_ms);
185  }
186  // else
187 
188  // Dispatch the subqueries first
189  for (auto subquery : subqueries) {
190  // Execute the subquery and cache the result.
191  RelAlgExecutor ra_executor(executor_, cat_);
192  const auto subquery_ra = subquery->getRelAlg();
193  CHECK(subquery_ra);
194  RaExecutionSequence subquery_seq(subquery_ra);
195  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
196  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
197  }
198  return executeRelAlgSeq(seq, co, eo, render_info, queue_time_ms);
199 }
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
const bool allow_multifrag
const bool find_push_down_candidates
const bool just_validate
const bool with_dynamic_watchdog
const double gpu_input_mem_limit_percent
A container for relational algebra descriptors defining the execution order for a relational algebra ...
const Catalog_Namespace::Catalog & cat_
const bool just_calcite_explain
const bool allow_loop_joins
#define CHECK(condition)
Definition: Logger.h:197
const bool allow_runtime_query_interrupt
const unsigned dynamic_watchdog_time_limit
Executor * executor_
const bool with_watchdog
+ Here is the call graph for this function:

◆ executeRelAlgSeq()

ExecutionResult RelAlgExecutor::executeRelAlgSeq ( const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const bool  with_existing_temp_tables = false 
)

Definition at line 496 of file RelAlgExecutor.cpp.

References CHECK, DEBUG_TIMER, RaExecutionSequence::empty(), ResultSet::executor_, ExecutionOptions::executor_type, g_enable_interop, RaExecutionDesc::getBody(), RaExecutionSequence::getDescriptor(), RaExecutionDesc::getResult(), logger::INFO, INJECT_TIMER, ExecutionOptions::just_explain, LOG, RaExecutionSequence::size(), and VLOG.

Referenced by executeRelAlgQueryNoRetry(), and executeRelAlgQueryWithFilterPushDown().

501  {
503  auto timer = DEBUG_TIMER(__func__);
504  if (!with_existing_temp_tables) {
505  decltype(temporary_tables_)().swap(temporary_tables_);
506  }
507  decltype(target_exprs_owned_)().swap(target_exprs_owned_);
508  executor_->catalog_ = &cat_;
509  executor_->temporary_tables_ = &temporary_tables_;
510 
511  time(&now_);
512  CHECK(!seq.empty());
513  const auto exec_desc_count = eo.just_explain ? size_t(1) : seq.size();
514 
515  for (size_t i = 0; i < exec_desc_count; i++) {
516  VLOG(1) << "Executing query step " << i;
517  // only render on the last step
518  try {
519  executeRelAlgStep(seq,
520  i,
521  co,
522  eo,
523  (i == exec_desc_count - 1) ? render_info : nullptr,
524  queue_time_ms);
525  } catch (const NativeExecutionError&) {
526  if (!g_enable_interop) {
527  throw;
528  }
529  auto eo_extern = eo;
530  eo_extern.executor_type = ::ExecutorType::Extern;
531  auto exec_desc_ptr = seq.getDescriptor(i);
532  const auto body = exec_desc_ptr->getBody();
533  const auto compound = dynamic_cast<const RelCompound*>(body);
534  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
535  LOG(INFO) << "Also failed to run the query using interoperability";
536  throw;
537  }
538  executeRelAlgStep(seq,
539  i,
540  co,
541  eo_extern,
542  (i == exec_desc_count - 1) ? render_info : nullptr,
543  queue_time_ms);
544  }
545  }
546 
547  return seq.getDescriptor(exec_desc_count - 1)->getResult();
548 }
const ExecutionResult & getResult() const
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
#define LOG(tag)
Definition: Logger.h:188
TemporaryTables temporary_tables_
const RelAlgNode * getBody() const
bool g_enable_interop
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
ExecutorType executor_type
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const Catalog_Namespace::Catalog & cat_
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
Executor * executor_
#define VLOG(n)
Definition: Logger.h:291
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ executeRelAlgStep()

void RelAlgExecutor::executeRelAlgStep ( const RaExecutionSequence seq,
const size_t  step_idx,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 575 of file RelAlgExecutor.cpp.

References ExecutionOptions::allow_loop_joins, ExecutionOptions::allow_multifrag, ExecutionOptions::allow_runtime_query_interrupt, CHECK, DEBUG_TIMER, ExecutionOptions::dynamic_watchdog_time_limit, ResultSet::executor_, ExecutionOptions::executor_type, logger::FATAL, ExecutionOptions::find_push_down_candidates, g_cluster, g_enable_union, g_skip_intermediate_count, RaExecutionSequence::getDescriptor(), RelAlgNode::getId(), ExecutionOptions::gpu_input_mem_limit_percent, INJECT_TIMER, ExecutionOptions::jit_debug, ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, LOG, ExecutionOptions::outer_fragment_indices, ExecutionOptions::output_columnar_hint, WindowProjectNodeContext::reset(), ExecutionOptions::runtime_query_interrupt_frequency, ResultSet::sort(), VLOG, ExecutionOptions::with_dynamic_watchdog, and ExecutionOptions::with_watchdog.

580  {
582  auto timer = DEBUG_TIMER(__func__);
584  auto exec_desc_ptr = seq.getDescriptor(step_idx);
585  CHECK(exec_desc_ptr);
586  auto& exec_desc = *exec_desc_ptr;
587  const auto body = exec_desc.getBody();
588  if (body->isNop()) {
589  handleNop(exec_desc);
590  return;
591  }
592  const ExecutionOptions eo_work_unit{
594  eo.allow_multifrag,
595  eo.just_explain,
596  eo.allow_loop_joins,
597  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
598  eo.jit_debug,
599  eo.just_validate,
607  eo.executor_type,
608  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
609 
610  const auto compound = dynamic_cast<const RelCompound*>(body);
611  if (compound) {
612  if (compound->isDeleteViaSelect()) {
613  executeDelete(compound, co, eo_work_unit, queue_time_ms);
614  } else if (compound->isUpdateViaSelect()) {
615  executeUpdate(compound, co, eo_work_unit, queue_time_ms);
616  } else {
617  exec_desc.setResult(
618  executeCompound(compound, co, eo_work_unit, render_info, queue_time_ms));
619  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
620  << static_cast<int>(-compound->getId()) << ", ...)"
621  << " exec_desc.getResult().getDataPtr()->rowCount()="
622  << exec_desc.getResult().getDataPtr()->rowCount();
623  if (exec_desc.getResult().isFilterPushDownEnabled()) {
624  return;
625  }
626  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
627  }
628  return;
629  }
630  const auto project = dynamic_cast<const RelProject*>(body);
631  if (project) {
632  if (project->isDeleteViaSelect()) {
633  executeDelete(project, co, eo_work_unit, queue_time_ms);
634  } else if (project->isUpdateViaSelect()) {
635  executeUpdate(project, co, eo_work_unit, queue_time_ms);
636  } else {
637  std::optional<size_t> prev_count;
638  // Disabling the intermediate count optimization in distributed, as the previous
639  // execution descriptor will likely not hold the aggregated result.
640  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
641  auto prev_exec_desc = seq.getDescriptor(step_idx - 1);
642  CHECK(prev_exec_desc);
643  RelAlgNode const* prev_body = prev_exec_desc->getBody();
644  // This optimization needs to be restricted in its application for UNION, which
645  // can have 2 input nodes in which neither should restrict the count of the other.
646  // However some non-UNION queries are measurably slower with this restriction, so
647  // it is only applied when g_enable_union is true.
648  bool const parent_check =
649  !g_enable_union || project->getInput(0)->getId() == prev_body->getId();
650  // If the previous node produced a reliable count, skip the pre-flight count
651  if (parent_check && (dynamic_cast<const RelCompound*>(prev_body) ||
652  dynamic_cast<const RelLogicalValues*>(prev_body))) {
653  const auto& prev_exe_result = prev_exec_desc->getResult();
654  const auto prev_result = prev_exe_result.getRows();
655  if (prev_result) {
656  prev_count = prev_result->rowCount();
657  VLOG(3) << "Setting output row count for projection node to previous node ("
658  << prev_exec_desc->getBody()->toString() << ") to " << *prev_count;
659  }
660  }
661  }
662  exec_desc.setResult(executeProject(
663  project, co, eo_work_unit, render_info, queue_time_ms, prev_count));
664  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
665  << static_cast<int>(-project->getId()) << ", ...)"
666  << " exec_desc.getResult().getDataPtr()->rowCount()="
667  << exec_desc.getResult().getDataPtr()->rowCount();
668  if (exec_desc.getResult().isFilterPushDownEnabled()) {
669  return;
670  }
671  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
672  }
673  return;
674  }
675  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
676  if (aggregate) {
677  exec_desc.setResult(
678  executeAggregate(aggregate, co, eo_work_unit, render_info, queue_time_ms));
679  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
680  return;
681  }
682  const auto filter = dynamic_cast<const RelFilter*>(body);
683  if (filter) {
684  exec_desc.setResult(
685  executeFilter(filter, co, eo_work_unit, render_info, queue_time_ms));
686  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
687  return;
688  }
689  const auto sort = dynamic_cast<const RelSort*>(body);
690  if (sort) {
691  exec_desc.setResult(executeSort(sort, co, eo_work_unit, render_info, queue_time_ms));
692  if (exec_desc.getResult().isFilterPushDownEnabled()) {
693  return;
694  }
695  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
696  return;
697  }
698  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
699  if (logical_values) {
700  exec_desc.setResult(executeLogicalValues(logical_values, eo_work_unit));
701  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
702  return;
703  }
704  const auto modify = dynamic_cast<const RelModify*>(body);
705  if (modify) {
706  exec_desc.setResult(executeModify(modify, eo_work_unit));
707  return;
708  }
709  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
710  if (logical_union) {
711  exec_desc.setResult(
712  executeUnion(logical_union, seq, co, eo_work_unit, render_info, queue_time_ms));
713  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
714  return;
715  }
716  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
717  if (table_func) {
718  exec_desc.setResult(
719  executeTableFunction(table_func, co, eo_work_unit, queue_time_ms));
720  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
721  return;
722  }
723  LOG(FATAL) << "Unhandled body type: " << body->toString();
724 }
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
unsigned getId() const
bool g_skip_intermediate_count
#define LOG(tag)
Definition: Logger.h:188
const unsigned runtime_query_interrupt_frequency
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
bool g_enable_union
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
void handleNop(RaExecutionDesc &ed)
const bool allow_multifrag
const bool find_push_down_candidates
const bool just_validate
ExecutorType executor_type
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const bool with_dynamic_watchdog
static void reset(Executor *executor)
const double gpu_input_mem_limit_percent
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
const std::vector< size_t > outer_fragment_indices
const bool just_calcite_explain
ExecutionResult executeUnion(const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
const bool allow_loop_joins
RaExecutionDesc * getDescriptor(size_t idx) const
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool g_cluster
const bool allow_runtime_query_interrupt
const unsigned dynamic_watchdog_time_limit
void executeDelete(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
Executor * executor_
#define VLOG(n)
Definition: Logger.h:291
const bool with_watchdog
+ Here is the call graph for this function:

◆ executeRelAlgSubSeq()

ExecutionResult RelAlgExecutor::executeRelAlgSubSeq ( const RaExecutionSequence seq,
const std::pair< size_t, size_t >  interval,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)

Definition at line 550 of file RelAlgExecutor.cpp.

References ResultSet::executor_, RaExecutionSequence::getDescriptor(), RaExecutionDesc::getResult(), and INJECT_TIMER.

556  {
558  executor_->catalog_ = &cat_;
559  executor_->temporary_tables_ = &temporary_tables_;
560 
561  time(&now_);
562  for (size_t i = interval.first; i < interval.second; i++) {
563  // only render on the last step
564  executeRelAlgStep(seq,
565  i,
566  co,
567  eo,
568  (i == interval.second - 1) ? render_info : nullptr,
569  queue_time_ms);
570  }
571 
572  return seq.getDescriptor(interval.second - 1)->getResult();
573 }
const ExecutionResult & getResult() const
TemporaryTables temporary_tables_
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const Catalog_Namespace::Catalog & cat_
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
Executor * executor_
+ Here is the call graph for this function:

◆ executeSimpleInsert()

ExecutionResult RelAlgExecutor::executeSimpleInsert ( const Analyzer::Query insert_query)

Definition at line 2045 of file RelAlgExecutor.cpp.

References appendDatum(), DataBlockPtr::arraysPtr, CHECK, CHECK_EQ, checked_malloc(), Fragmenter_Namespace::InsertData::columnIds, CPU, Fragmenter_Namespace::InsertData::data, Fragmenter_Namespace::InsertData::databaseId, ResultSet::executor_, get_column_descriptor(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_elem_type(), Analyzer::Query::get_result_col_list(), Analyzer::Query::get_result_table_id(), SQLTypeInfo::get_size(), Analyzer::Query::get_targetlist(), SQLTypeInfo::get_type(), inline_fixed_encoding_null_val(), anonymous_namespace{RelAlgExecutor.cpp}::insert_one_dict_str(), anonymous_namespace{TypedDataAccessors.h}::is_null(), SQLTypeInfo::is_string(), kARRAY, kBIGINT, kBOOLEAN, kCAST, kCHAR, kDATE, kDECIMAL, kDOUBLE, kENCODING_DICT, kENCODING_NONE, kFLOAT, kINT, kLINESTRING, kMULTIPOLYGON, kNUMERIC, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, DataBlockPtr::numbersPtr, Fragmenter_Namespace::InsertData::numRows, anonymous_namespace{TypedDataAccessors.h}::put_null(), anonymous_namespace{TypedDataAccessors.h}::put_null_array(), SHARD_FOR_KEY, DataBlockPtr::stringsPtr, Fragmenter_Namespace::InsertData::tableId, and to_string().

Referenced by Parser::InsertValuesStmt::execute().

2045  {
2046  // Note: We currently obtain an executor for this method, but we do not need it.
2047  // Therefore, we skip the executor state setup in the regular execution path. In the
2048  // future, we will likely want to use the executor to evaluate expressions in the insert
2049  // statement.
2050 
2051  const auto& targets = query.get_targetlist();
2052  const int table_id = query.get_result_table_id();
2053  const auto& col_id_list = query.get_result_col_list();
2054 
2055  std::vector<const ColumnDescriptor*> col_descriptors;
2056  std::vector<int> col_ids;
2057  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2058  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2059  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2060 
2061  const auto table_descriptor = cat_.getMetadataForTable(table_id);
2062  CHECK(table_descriptor);
2063  const auto shard_tables = cat_.getPhysicalTablesDescriptors(table_descriptor);
2064  const TableDescriptor* shard{nullptr};
2065 
2066  for (const int col_id : col_id_list) {
2067  const auto cd = get_column_descriptor(col_id, table_id, cat_);
2068  const auto col_enc = cd->columnType.get_compression();
2069  if (cd->columnType.is_string()) {
2070  switch (col_enc) {
2071  case kENCODING_NONE: {
2072  auto it_ok =
2073  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2074  CHECK(it_ok.second);
2075  break;
2076  }
2077  case kENCODING_DICT: {
2078  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
2079  CHECK(dd);
2080  const auto it_ok = col_buffers.emplace(
2081  col_id, std::make_unique<uint8_t[]>(cd->columnType.get_size()));
2082  CHECK(it_ok.second);
2083  break;
2084  }
2085  default:
2086  CHECK(false);
2087  }
2088  } else if (cd->columnType.is_geometry()) {
2089  auto it_ok =
2090  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2091  CHECK(it_ok.second);
2092  } else if (cd->columnType.is_array()) {
2093  auto it_ok =
2094  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2095  CHECK(it_ok.second);
2096  } else {
2097  const auto it_ok = col_buffers.emplace(
2098  col_id,
2099  std::unique_ptr<uint8_t[]>(
2100  new uint8_t[cd->columnType.get_logical_size()]())); // changed to zero-init
2101  // the buffer
2102  CHECK(it_ok.second);
2103  }
2104  col_descriptors.push_back(cd);
2105  col_ids.push_back(col_id);
2106  }
2107  size_t col_idx = 0;
2109  insert_data.databaseId = cat_.getCurrentDB().dbId;
2110  insert_data.tableId = table_id;
2111  int64_t int_col_val{0};
2112  for (auto target_entry : targets) {
2113  auto col_cv = dynamic_cast<const Analyzer::Constant*>(target_entry->get_expr());
2114  if (!col_cv) {
2115  auto col_cast = dynamic_cast<const Analyzer::UOper*>(target_entry->get_expr());
2116  CHECK(col_cast);
2117  CHECK_EQ(kCAST, col_cast->get_optype());
2118  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2119  }
2120  CHECK(col_cv);
2121  const auto cd = col_descriptors[col_idx];
2122  auto col_datum = col_cv->get_constval();
2123  auto col_type = cd->columnType.get_type();
2124  uint8_t* col_data_bytes{nullptr};
2125  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2126  (!cd->columnType.is_string() ||
2127  cd->columnType.get_compression() == kENCODING_DICT)) {
2128  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2129  CHECK(col_data_bytes_it != col_buffers.end());
2130  col_data_bytes = col_data_bytes_it->second.get();
2131  }
2132  switch (col_type) {
2133  case kBOOLEAN: {
2134  auto col_data = col_data_bytes;
2135  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2136  : (col_datum.boolval ? 1 : 0);
2137  break;
2138  }
2139  case kTINYINT: {
2140  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2141  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2142  : col_datum.tinyintval;
2143  int_col_val = col_datum.tinyintval;
2144  break;
2145  }
2146  case kSMALLINT: {
2147  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2148  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2149  : col_datum.smallintval;
2150  int_col_val = col_datum.smallintval;
2151  break;
2152  }
2153  case kINT: {
2154  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2155  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2156  : col_datum.intval;
2157  int_col_val = col_datum.intval;
2158  break;
2159  }
2160  case kBIGINT:
2161  case kDECIMAL:
2162  case kNUMERIC: {
2163  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2164  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2165  : col_datum.bigintval;
2166  int_col_val = col_datum.bigintval;
2167  break;
2168  }
2169  case kFLOAT: {
2170  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2171  *col_data = col_datum.floatval;
2172  break;
2173  }
2174  case kDOUBLE: {
2175  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2176  *col_data = col_datum.doubleval;
2177  break;
2178  }
2179  case kTEXT:
2180  case kVARCHAR:
2181  case kCHAR: {
2182  switch (cd->columnType.get_compression()) {
2183  case kENCODING_NONE:
2184  str_col_buffers[col_ids[col_idx]].push_back(
2185  col_datum.stringval ? *col_datum.stringval : "");
2186  break;
2187  case kENCODING_DICT: {
2188  switch (cd->columnType.get_size()) {
2189  case 1:
2190  int_col_val = insert_one_dict_str(
2191  reinterpret_cast<uint8_t*>(col_data_bytes), cd, col_cv, cat_);
2192  break;
2193  case 2:
2194  int_col_val = insert_one_dict_str(
2195  reinterpret_cast<uint16_t*>(col_data_bytes), cd, col_cv, cat_);
2196  break;
2197  case 4:
2198  int_col_val = insert_one_dict_str(
2199  reinterpret_cast<int32_t*>(col_data_bytes), cd, col_cv, cat_);
2200  break;
2201  default:
2202  CHECK(false);
2203  }
2204  break;
2205  }
2206  default:
2207  CHECK(false);
2208  }
2209  break;
2210  }
2211  case kTIME:
2212  case kTIMESTAMP:
2213  case kDATE: {
2214  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2215  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2216  : col_datum.bigintval;
2217  break;
2218  }
2219  case kARRAY: {
2220  const auto is_null = col_cv->get_is_null();
2221  const auto size = cd->columnType.get_size();
2222  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2223  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
2224  const auto is_point_coords = (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
2225  if (is_null && !is_point_coords) {
2226  if (size > 0) {
2227  // NULL fixlen array: NULL_ARRAY sentinel followed by NULL sentinels
2228  if (elem_ti.is_string() && elem_ti.get_compression() == kENCODING_DICT) {
2229  throw std::runtime_error("Column " + cd->columnName +
2230  " doesn't accept NULL values");
2231  }
2232  int8_t* buf = (int8_t*)checked_malloc(size);
2233  put_null_array(static_cast<void*>(buf), elem_ti, "");
2234  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2235  p += elem_ti.get_size()) {
2236  put_null(static_cast<void*>(p), elem_ti, "");
2237  }
2238  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2239  } else {
2240  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2241  }
2242  break;
2243  }
2244  const auto l = col_cv->get_value_list();
2245  size_t len = l.size() * elem_ti.get_size();
2246  if (size > 0 && static_cast<size_t>(size) != len) {
2247  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2248  std::to_string(size / elem_ti.get_size()) +
2249  " values, " + "received " + std::to_string(l.size()));
2250  }
2251  if (elem_ti.is_string()) {
2252  CHECK(kENCODING_DICT == elem_ti.get_compression());
2253  CHECK(4 == elem_ti.get_size());
2254 
2255  int8_t* buf = (int8_t*)checked_malloc(len);
2256  int32_t* p = reinterpret_cast<int32_t*>(buf);
2257 
2258  int elemIndex = 0;
2259  for (auto& e : l) {
2260  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2261  CHECK(c);
2262 
2263  int_col_val = insert_one_dict_str(
2264  &p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
2265 
2266  elemIndex++;
2267  }
2268  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2269 
2270  } else {
2271  int8_t* buf = (int8_t*)checked_malloc(len);
2272  int8_t* p = buf;
2273  for (auto& e : l) {
2274  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2275  CHECK(c);
2276  p = appendDatum(p, c->get_constval(), elem_ti);
2277  }
2278  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2279  }
2280  break;
2281  }
2282  case kPOINT:
2283  case kLINESTRING:
2284  case kPOLYGON:
2285  case kMULTIPOLYGON:
2286  str_col_buffers[col_ids[col_idx]].push_back(
2287  col_datum.stringval ? *col_datum.stringval : "");
2288  break;
2289  default:
2290  CHECK(false);
2291  }
2292  ++col_idx;
2293  if (col_idx == static_cast<size_t>(table_descriptor->shardedColumnId)) {
2294  const auto shard_count = shard_tables.size();
2295  const size_t shard_idx = SHARD_FOR_KEY(int_col_val, shard_count);
2296  shard = shard_tables[shard_idx];
2297  }
2298  }
2299  for (const auto& kv : col_buffers) {
2300  insert_data.columnIds.push_back(kv.first);
2301  DataBlockPtr p;
2302  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2303  insert_data.data.push_back(p);
2304  }
2305  for (auto& kv : str_col_buffers) {
2306  insert_data.columnIds.push_back(kv.first);
2307  DataBlockPtr p;
2308  p.stringsPtr = &kv.second;
2309  insert_data.data.push_back(p);
2310  }
2311  for (auto& kv : arr_col_buffers) {
2312  insert_data.columnIds.push_back(kv.first);
2313  DataBlockPtr p;
2314  p.arraysPtr = &kv.second;
2315  insert_data.data.push_back(p);
2316  }
2317  insert_data.numRows = 1;
2318  if (shard) {
2319  shard->fragmenter->insertData(insert_data);
2320  } else {
2321  table_descriptor->fragmenter->insertData(insert_data);
2322  }
2323 
2324  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2327  executor_->getRowSetMemoryOwner(),
2328  executor_);
2329  std::vector<TargetMetaInfo> empty_targets;
2330  return {rs, empty_targets};
2331 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
bool is_string() const
Definition: sqltypes.h:417
Definition: sqltypes.h:51
int64_t insert_one_dict_str(T *col_data, const ColumnDescriptor *cd, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:150
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:151
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
Definition: sqldefs.h:49
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
std::vector< TargetInfo > TargetInfoList
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3619
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:267
std::string to_string(char const *&&v)
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:131
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
const Catalog_Namespace::Catalog & cat_
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:208
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1451
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
Definition: sqltypes.h:54
Definition: sqltypes.h:55
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: sqltypes.h:871
bool is_null(const T &v, const SQLTypeInfo &t)
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
Definition: sqltypes.h:43
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:624
#define CHECK(condition)
Definition: Logger.h:197
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
Definition: sqltypes.h:47
specifies the content in-memory of a row in the table metadata table
int8_t * numbersPtr
Definition: sqltypes.h:149
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
Executor * executor_
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:154
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ executeSort()

ExecutionResult RelAlgExecutor::executeSort ( const RelSort sort,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2363 of file RelAlgExecutor.cpp.

References GroupByAndAggregate::addTransientStringLiterals(), CHECK, CHECK_EQ, anonymous_namespace{RelAlgExecutor.cpp}::check_sort_node_source_constraint(), RelSort::collationCount(), DEBUG_TIMER, ResultSet::executor_, anonymous_namespace{RelAlgExecutor.cpp}::first_oe_is_desc(), g_cluster, anonymous_namespace{RelAlgExecutor.cpp}::get_order_entries(), RelAlgNode::getId(), RelAlgNode::getInput(), RelSort::getLimit(), RelSort::getOffset(), RelSort::isEmptyResult(), anonymous_namespace{RelAlgExecutor.cpp}::node_is_aggregate(), ExecutionOptions::output_columnar_hint, run_benchmark_import::result, RelAlgNode::setOutputMetainfo(), ResultSet::sort(), and use_speculative_top_n().

2367  {
2368  auto timer = DEBUG_TIMER(__func__);
2370  const auto source = sort->getInput(0);
2371  const bool is_aggregate = node_is_aggregate(source);
2372  auto it = leaf_results_.find(sort->getId());
2373  if (it != leaf_results_.end()) {
2374  // Add any transient string literals to the sdp on the agg
2375  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2377  source_work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
2378  // Handle push-down for LIMIT for multi-node
2379  auto& aggregated_result = it->second;
2380  auto& result_rows = aggregated_result.rs;
2381  const size_t limit = sort->getLimit();
2382  const size_t offset = sort->getOffset();
2383  const auto order_entries = get_order_entries(sort);
2384  if (limit || offset) {
2385  if (!order_entries.empty()) {
2386  result_rows->sort(order_entries, limit + offset);
2387  }
2388  result_rows->dropFirstN(offset);
2389  if (limit) {
2390  result_rows->keepFirstN(limit);
2391  }
2392  }
2393  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2394  sort->setOutputMetainfo(aggregated_result.targets_meta);
2395  return result;
2396  }
2397 
2398  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
2399  bool is_desc{false};
2400 
2401  auto execute_sort_query = [this,
2402  sort,
2403  &source,
2404  &is_aggregate,
2405  &eo,
2406  &co,
2407  render_info,
2408  queue_time_ms,
2409  &groupby_exprs,
2410  &is_desc]() -> ExecutionResult {
2411  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2412  is_desc = first_oe_is_desc(source_work_unit.exe_unit.sort_info.order_entries);
2413  ExecutionOptions eo_copy = {
2415  eo.allow_multifrag,
2416  eo.just_explain,
2417  eo.allow_loop_joins,
2418  eo.with_watchdog,
2419  eo.jit_debug,
2420  eo.just_validate || sort->isEmptyResult(),
2421  eo.with_dynamic_watchdog,
2422  eo.dynamic_watchdog_time_limit,
2423  eo.find_push_down_candidates,
2424  eo.just_calcite_explain,
2425  eo.gpu_input_mem_limit_percent,
2426  eo.allow_runtime_query_interrupt,
2427  eo.runtime_query_interrupt_frequency,
2428  eo.executor_type,
2429  };
2430 
2431  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
2432  auto source_result = executeWorkUnit(source_work_unit,
2433  source->getOutputMetainfo(),
2434  is_aggregate,
2435  co,
2436  eo_copy,
2437  render_info,
2438  queue_time_ms);
2439  if (render_info && render_info->isPotentialInSituRender()) {
2440  return source_result;
2441  }
2442  if (source_result.isFilterPushDownEnabled()) {
2443  return source_result;
2444  }
2445  auto rows_to_sort = source_result.getRows();
2446  if (eo.just_explain) {
2447  return {rows_to_sort, {}};
2448  }
2449  const size_t limit = sort->getLimit();
2450  const size_t offset = sort->getOffset();
2451  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
2452  !use_speculative_top_n(source_work_unit.exe_unit,
2453  rows_to_sort->getQueryMemDesc())) {
2454  rows_to_sort->sort(source_work_unit.exe_unit.sort_info.order_entries,
2455  limit + offset);
2456  }
2457  if (limit || offset) {
2458  if (g_cluster && sort->collationCount() == 0) {
2459  if (offset >= rows_to_sort->rowCount()) {
2460  rows_to_sort->dropFirstN(offset);
2461  } else {
2462  rows_to_sort->keepFirstN(limit + offset);
2463  }
2464  } else {
2465  rows_to_sort->dropFirstN(offset);
2466  if (limit) {
2467  rows_to_sort->keepFirstN(limit);
2468  }
2469  }
2470  }
2471  return {rows_to_sort, source_result.getTargetsMeta()};
2472  };
2473 
2474  try {
2475  return execute_sort_query();
2476  } catch (const SpeculativeTopNFailed& e) {
2477  CHECK_EQ(size_t(1), groupby_exprs.size());
2478  CHECK(groupby_exprs.front());
2479  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
2480  return execute_sort_query();
2481  }
2482 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
size_t collationCount() const
unsigned getId() const
static SpeculativeTopNBlacklist speculative_topn_blacklist_
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
size_t getOffset() const
bool node_is_aggregate(const RelAlgNode *ra)
const std::shared_ptr< ResultSet > & getRows() const
size_t getLimit() const
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
bool g_cluster
const RelAlgNode * getInput(const size_t idx) const
void add(const std::shared_ptr< Analyzer::Expr > expr, const bool desc)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
bool isEmptyResult() const
Executor * executor_
WorkUnit createSortInputWorkUnit(const RelSort *, const ExecutionOptions &eo)
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)
+ Here is the call graph for this function:

◆ executeTableFunction()

ExecutionResult RelAlgExecutor::executeTableFunction ( const RelTableFunction table_func,
const CompilationOptions co_in,
const ExecutionOptions eo,
const int64_t  queue_time_ms 
)
private

Definition at line 1652 of file RelAlgExecutor.cpp.

References CHECK, DEBUG_TIMER, Executor::ERR_OUT_OF_GPU_MEM, ResultSet::executor_, g_cluster, g_enable_table_functions, get_table_infos(), QueryExecutionError::getErrorCode(), GPU, INJECT_TIMER, ExecutionOptions::just_explain, and run_benchmark_import::result.

1655  {
1657  auto timer = DEBUG_TIMER(__func__);
1658 
1659  auto co = co_in;
1660 
1661  if (g_cluster) {
1662  throw std::runtime_error("Table functions not supported in distributed mode yet");
1663  }
1664  if (!g_enable_table_functions) {
1665  throw std::runtime_error("Table function support is disabled");
1666  }
1667  auto table_func_work_unit = createTableFunctionWorkUnit(
1668  table_func,
1669  eo.just_explain,
1670  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
1671  const auto body = table_func_work_unit.body;
1672  CHECK(body);
1673 
1674  const auto table_infos =
1675  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
1676 
1677  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1678  co.device_type,
1680  nullptr,
1681  executor_),
1682  {}};
1683 
1684  try {
1685  result = {executor_->executeTableFunction(
1686  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
1687  body->getOutputMetainfo()};
1688  } catch (const QueryExecutionError& e) {
1691  throw std::runtime_error("Table function ran out of memory during execution");
1692  }
1693  result.setQueueTime(queue_time_ms);
1694  return result;
1695 }
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const Catalog_Namespace::Catalog & cat_
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:982
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
static void handlePersistentError(const int32_t error_code)
#define CHECK(condition)
Definition: Logger.h:197
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
bool g_cluster
Executor * executor_
bool g_enable_table_functions
Definition: Execute.cpp:99
+ Here is the call graph for this function:

◆ executeUnion()

ExecutionResult RelAlgExecutor::executeUnion ( const RelLogicalUnion logical_union,
const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 1860 of file RelAlgExecutor.cpp.

References RelLogicalUnion::checkForMatchingMetaInfoTypes(), DEBUG_TIMER, Default, RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelAlgNode::hasInput(), RelLogicalUnion::isAll(), isGeometry(), CompilationOptions::makeCpuOnly(), and RelAlgNode::setOutputMetainfo().

1865  {
1866  auto timer = DEBUG_TIMER(__func__);
1867  if (!logical_union->isAll()) {
1868  throw std::runtime_error("UNION without ALL is not supported yet.");
1869  }
1870  // Will throw a std::runtime_error if types don't match.
1871  logical_union->checkForMatchingMetaInfoTypes();
1872  logical_union->setOutputMetainfo(logical_union->getInput(0)->getOutputMetainfo());
1873  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
1874  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
1875  }
1876  // Only Projections and Aggregates from a UNION are supported for now.
1877  query_dag_->eachNode([logical_union](RelAlgNode const* node) {
1878  if (node->hasInput(logical_union) &&
1879  !shared::dynamic_castable_to_any<RelProject, RelLogicalUnion, RelAggregate>(
1880  node)) {
1881  throw std::runtime_error("UNION ALL not yet supported in this context.");
1882  }
1883  });
1884 
1885  auto work_unit =
1886  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
1887  return executeWorkUnit(work_unit,
1888  logical_union->getOutputMetainfo(),
1889  false,
1891  eo,
1892  render_info,
1893  queue_time_ms);
1894 }
void checkForMatchingMetaInfoTypes() const
bool isAll() const
bool hasInput(const RelAlgNode *needle) const
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool isGeometry(TargetMetaInfo const &target_meta_info)
WorkUnit createUnionWorkUnit(const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::unique_ptr< RelAlgDagBuilder > query_dag_
const RelAlgNode * getInput(const size_t idx) const
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
+ Here is the call graph for this function:

◆ executeUpdate()

void RelAlgExecutor::executeUpdate ( const RelAlgNode node,
const CompilationOptions co,
const ExecutionOptions eo,
const int64_t  queue_time_ms 
)
private

Definition at line 1384 of file RelAlgExecutor.cpp.

References CompilationOptions::allow_lazy_fetch, CHECK, CHECK_EQ, DEBUG_TIMER, Default, ResultSet::executor_, CompilationOptions::filter_on_deleted_column, get_table_infos(), get_temporary_table(), CompilationOptions::hoist_literals, CacheInvalidator< CACHE_HOLDING_TYPES >::invalidateCaches(), and CompilationOptions::makeCpuOnly().

1387  {
1388  CHECK(node);
1389  auto timer = DEBUG_TIMER(__func__);
1390 
1392 
1393  auto co = co_in;
1394  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
1395  // encoded string updates
1396 
1397  auto execute_update_for_node =
1398  [this, &co, &eo_in](const auto node, auto& work_unit, const bool is_aggregate) {
1399  UpdateTransactionParameters update_params(node->getModifiedTableDescriptor(),
1400  node->getTargetColumns(),
1401  node->getOutputMetainfo(),
1402  node->isVarlenUpdateRequired());
1403 
1404  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1405 
1406  auto execute_update_ra_exe_unit =
1407  [this, &co, &eo_in, &table_infos, &update_params](
1408  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
1410 
1411  auto eo = eo_in;
1412  if (update_params.tableIsTemporary()) {
1413  eo.output_columnar_hint = true;
1414  co_project.allow_lazy_fetch = false;
1415  co_project.filter_on_deleted_column =
1416  false; // project the entire delete column for columnar update
1417  }
1418 
1419  auto update_callback = yieldUpdateCallback(update_params);
1420  executor_->executeUpdate(ra_exe_unit,
1421  table_infos,
1422  co_project,
1423  eo,
1424  cat_,
1425  executor_->row_set_mem_owner_,
1426  update_callback,
1427  is_aggregate);
1428  update_params.finalizeTransaction();
1429  };
1430 
1431  if (update_params.tableIsTemporary()) {
1432  // hold owned target exprs during execution if rewriting
1433  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1434  // rewrite temp table updates to generate the full column by moving the where
1435  // clause into a case if such a rewrite is not possible, bail on the update
1436  // operation build an expr for the update target
1437  const auto td = update_params.getTableDescriptor();
1438  CHECK(td);
1439  const auto update_column_names = update_params.getUpdateColumnNames();
1440  if (update_column_names.size() > 1) {
1441  throw std::runtime_error(
1442  "Multi-column update is not yet supported for temporary tables.");
1443  }
1444 
1445  auto cd = cat_.getMetadataForColumn(td->tableId, update_column_names.front());
1446  CHECK(cd);
1447  auto projected_column_to_update =
1448  makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
1449  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
1450  work_unit.exe_unit, projected_column_to_update);
1451  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
1452  throw std::runtime_error(
1453  "Variable length updates not yet supported on temporary tables.");
1454  }
1455  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1456  } else {
1457  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1458  }
1459  };
1460 
1461  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1462  auto work_unit =
1463  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1464 
1465  execute_update_for_node(compound, work_unit, compound->isAggregate());
1466  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1467  auto work_unit =
1468  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1469 
1470  if (project->isSimple()) {
1471  CHECK_EQ(size_t(1), project->inputCount());
1472  const auto input_ra = project->getInput(0);
1473  if (dynamic_cast<const RelSort*>(input_ra)) {
1474  const auto& input_table =
1475  get_temporary_table(&temporary_tables_, -input_ra->getId());
1476  CHECK(input_table);
1477  work_unit.exe_unit.scan_limit = input_table->rowCount();
1478  }
1479  }
1480 
1481  execute_update_for_node(project, work_unit, false);
1482  } else {
1483  throw std::runtime_error("Unsupported parent node for update: " + node->toString());
1484  }
1485 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
TemporaryTables temporary_tables_
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static void invalidateCaches()
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:191
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
const Catalog_Namespace::Catalog & cat_
#define CHECK(condition)
Definition: Logger.h:197
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
Executor * executor_
+ Here is the call graph for this function:

◆ executeWorkUnit()

ExecutionResult RelAlgExecutor::executeWorkUnit ( const WorkUnit work_unit,
const std::vector< TargetMetaInfo > &  targets_meta,
const bool  is_agg,
const CompilationOptions co_in,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const std::optional< size_t >  previous_count = std::nullopt 
)
private

Definition at line 2682 of file RelAlgExecutor.cpp.

References GroupByAndAggregate::addTransientStringLiterals(), RelAlgExecutor::WorkUnit::body, anonymous_namespace{RelAlgExecutor.cpp}::build_render_targets(), anonymous_namespace{RelAlgExecutor.cpp}::can_use_bump_allocator(), CHECK, CHECK_EQ, CHECK_GT, anonymous_namespace{RelAlgExecutor.cpp}::compute_output_buffer_size(), CPU, DEBUG_TIMER, anonymous_namespace{RelAlgExecutor.cpp}::decide_approx_count_distinct_implementation(), RelAlgExecutor::WorkUnit::exe_unit, anonymous_namespace{RelAlgExecutor.cpp}::exe_unit_has_quals(), ResultSet::executor_, ExecutionOptions::executor_type, Extern, ExecutionOptions::find_push_down_candidates, g_big_group_threshold, g_enable_window_functions, get_table_infos(), QueryExecutionError::getErrorCode(), anonymous_namespace{RelAlgExecutor.cpp}::groups_approx_upper_bound(), INJECT_TIMER, anonymous_namespace{RelAlgExecutor.cpp}::is_agg(), anonymous_namespace{RelAlgExecutor.cpp}::is_window_execution_unit(), RenderInfo::isPotentialInSituRender(), ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, RelAlgExecutor::WorkUnit::max_groups_buffer_entry_guess, ra_exec_unit_desc_for_caching(), CardinalityEstimationRequired::range(), run_benchmark_import::result, RelAlgExecutionUnit::target_exprs, VLOG, and QueryExecutionError::wasMultifragKernelLaunch().

2690  {
2692  auto timer = DEBUG_TIMER(__func__);
2693 
2694  auto co = co_in;
2695  ColumnCacheMap column_cache;
2696  if (is_window_execution_unit(work_unit.exe_unit)) {
2698  throw std::runtime_error("Window functions support is disabled");
2699  }
2700  co.device_type = ExecutorDeviceType::CPU;
2701  co.allow_lazy_fetch = false;
2702  computeWindow(work_unit.exe_unit, co, eo, column_cache, queue_time_ms);
2703  }
2704  if (!eo.just_explain && eo.find_push_down_candidates) {
2705  // find potential candidates:
2706  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
2707  if (!selected_filters.empty() || eo.just_calcite_explain) {
2708  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
2709  }
2710  }
2711  if (render_info && render_info->isPotentialInSituRender()) {
2712  co.allow_lazy_fetch = false;
2713  }
2714  const auto body = work_unit.body;
2715  CHECK(body);
2716  auto it = leaf_results_.find(body->getId());
2717  VLOG(3) << "body->getId()=" << body->getId() << " body->toString()=" << body->toString()
2718  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
2719  if (it != leaf_results_.end()) {
2721  work_unit.exe_unit, executor_, executor_->row_set_mem_owner_);
2722  auto& aggregated_result = it->second;
2723  auto& result_rows = aggregated_result.rs;
2724  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2725  body->setOutputMetainfo(aggregated_result.targets_meta);
2726  if (render_info) {
2727  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2728  }
2729  return result;
2730  }
2731  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2732 
2734  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
2735  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2736  if (is_window_execution_unit(ra_exe_unit)) {
2737  CHECK_EQ(table_infos.size(), size_t(1));
2738  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
2739  max_groups_buffer_entry_guess =
2740  table_infos.front().info.fragments.front().getNumTuples();
2741  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
2742  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
2743  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
2744  ra_exe_unit.scan_limit = *previous_count;
2745  } else {
2746  // TODO(adb): enable bump allocator path for render queries
2747  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
2748  ra_exe_unit.scan_limit = 0;
2749  ra_exe_unit.use_bump_allocator = true;
2750  } else if (eo.executor_type == ::ExecutorType::Extern) {
2751  ra_exe_unit.scan_limit = 0;
2752  } else if (!eo.just_explain) {
2753  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
2754  if (filter_count_all) {
2755  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
2756  }
2757  }
2758  }
2759  }
2760  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2761  co.device_type,
2763  nullptr,
2764  executor_),
2765  {}};
2766 
2767  auto execute_and_handle_errors =
2768  [&](const auto max_groups_buffer_entry_guess_in,
2769  const bool has_cardinality_estimation) -> ExecutionResult {
2770  // Note that the groups buffer entry guess may be modified during query execution.
2771  // Create a local copy so we can track those changes if we need to attempt a retry
2772  // due to OOM
2773  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
2774  try {
2775  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
2776  is_agg,
2777  table_infos,
2778  ra_exe_unit,
2779  co,
2780  eo,
2781  cat_,
2782  render_info,
2783  has_cardinality_estimation,
2784  column_cache),
2785  targets_meta};
2786  } catch (const QueryExecutionError& e) {
2788  return handleOutOfMemoryRetry(
2789  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
2790  targets_meta,
2791  is_agg,
2792  co,
2793  eo,
2794  render_info,
2796  queue_time_ms);
2797  }
2798  };
2799 
2800  auto cache_key = ra_exec_unit_desc_for_caching(ra_exe_unit);
2801  try {
2802  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
2803  auto card = cached_cardinality.second;
2804  if (cached_cardinality.first && card >= 0) {
2805  result = execute_and_handle_errors(card, true);
2806  } else {
2807  result = execute_and_handle_errors(
2808  max_groups_buffer_entry_guess,
2810  }
2811  } catch (const CardinalityEstimationRequired& e) {
2812  // check the cardinality cache
2813  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
2814  auto card = cached_cardinality.second;
2815  if (cached_cardinality.first && card >= 0) {
2816  result = execute_and_handle_errors(card, true);
2817  } else {
2818  const auto estimated_groups_buffer_entry_guess =
2819  2 * std::min(groups_approx_upper_bound(table_infos),
2820  getNDVEstimation(work_unit, e.range(), is_agg, co, eo));
2821  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
2822  result = execute_and_handle_errors(estimated_groups_buffer_entry_guess, true);
2823  if (!(eo.just_validate || eo.just_explain)) {
2824  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
2825  }
2826  }
2827  }
2828 
2829  result.setQueueTime(queue_time_ms);
2830  if (render_info) {
2831  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2832  if (render_info->isPotentialInSituRender()) {
2833  // return an empty result (with the same queue time, and zero render time)
2834  return {std::make_shared<ResultSet>(
2835  queue_time_ms,
2836  0,
2837  executor_->row_set_mem_owner_
2838  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
2839  : nullptr),
2840  {}};
2841  }
2842  }
2843  return result;
2844 }
bool is_agg(const Analyzer::Expr *expr)
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
#define CHECK_EQ(x, y)
Definition: Logger.h:205
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1191
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
bool wasMultifragKernelLaunch() const
Definition: ErrorHandling.h:57
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
#define CHECK_GT(x, y)
Definition: Logger.h:209
bool is_window_execution_unit(const RelAlgExecutionUnit &ra_exe_unit)
void computeWindow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
const bool find_push_down_candidates
const bool just_validate
ExecutorType executor_type
#define INJECT_TIMER(DESC)
Definition: measure.h:93
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr *> &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
const Catalog_Namespace::Catalog & cat_
size_t g_big_group_threshold
Definition: Execute.cpp:97
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
size_t groups_approx_upper_bound(const std::vector< InputTableInfo > &table_infos)
size_t getNDVEstimation(const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
std::optional< size_t > getFilteredCountAll(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
bool g_enable_window_functions
Definition: Execute.cpp:98
const bool just_calcite_explain
bool isRowidLookup(const WorkUnit &work_unit)
bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit)
static void handlePersistentError(const int32_t error_code)
bool compute_output_buffer_size(const RelAlgExecutionUnit &ra_exe_unit)
#define CHECK(condition)
Definition: Logger.h:197
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
RelAlgExecutionUnit decide_approx_count_distinct_implementation(const RelAlgExecutionUnit &ra_exe_unit_in, const std::vector< InputTableInfo > &table_infos, const Executor *executor, const ExecutorDeviceType device_type_in, std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
Executor * executor_
#define VLOG(n)
Definition: Logger.h:291
+ Here is the call graph for this function:

◆ getErrorMessageFromCode()

std::string RelAlgExecutor::getErrorMessageFromCode ( const int32_t  error_code)
static

Definition at line 3069 of file RelAlgExecutor.cpp.

References Executor::ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED, Executor::ERR_DIV_BY_ZERO, Executor::ERR_GEOS, Executor::ERR_INTERRUPTED, Executor::ERR_OUT_OF_CPU_MEM, Executor::ERR_OUT_OF_GPU_MEM, Executor::ERR_OUT_OF_RENDER_MEM, Executor::ERR_OUT_OF_TIME, Executor::ERR_OVERFLOW_OR_UNDERFLOW, Executor::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES, Executor::ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY, Executor::ERR_STRING_CONST_IN_RESULTSET, Executor::ERR_TOO_MANY_LITERALS, Executor::ERR_UNSUPPORTED_SELF_JOIN, and to_string().

3069  {
3070  if (error_code < 0) {
3071  return "Ran out of slots in the query output buffer";
3072  }
3073  switch (error_code) {
3075  return "Division by zero";
3077  return "Query couldn't keep the entire working set of columns in GPU memory";
3079  return "Self joins not supported yet";
3081  return "Not enough host memory to execute the query";
3083  return "Overflow or underflow";
3085  return "Query execution has exceeded the time limit";
3087  return "Query execution has been interrupted";
3089  return "Columnar conversion not supported for variable length types";
3091  return "Too many literals in the query";
3093  return "NONE ENCODED String types are not supported as input result set.";
3095  return "Not enough OpenGL memory to render the query results";
3097  return "Streaming-Top-N not supported in Render Query";
3099  return "Multiple distinct values encountered";
3100  case Executor::ERR_GEOS:
3101  return "Geos call failure";
3102  }
3103  return "Other error: code " + std::to_string(error_code);
3104 }
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:989
static const int32_t ERR_GEOS
Definition: Execute.h:995
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:991
std::string to_string(char const *&&v)
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:992
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:993
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:990
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:981
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:985
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:987
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:988
static const int32_t ERR_UNSUPPORTED_SELF_JOIN
Definition: Execute.h:984
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:994
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:982
static const int32_t ERR_OUT_OF_CPU_MEM
Definition: Execute.h:986
+ Here is the call graph for this function:

◆ getExecutor()

Executor * RelAlgExecutor::getExecutor ( ) const

Definition at line 392 of file RelAlgExecutor.cpp.

References ResultSet::executor_.

392  {
393  return executor_;
394 }
Executor * executor_

◆ getFilteredCountAll()

std::optional< size_t > RelAlgExecutor::getFilteredCountAll ( const WorkUnit work_unit,
const bool  is_agg,
const CompilationOptions co,
const ExecutionOptions eo 
)
private

Definition at line 2846 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, CHECK_GE, create_count_all_execution_unit(), RelAlgExecutor::WorkUnit::exe_unit, ResultSet::executor_, g_bigint_count, get_table_infos(), kBIGINT, kCOUNT, kINT, LOG, and logger::WARNING.

2849  {
2850  const auto count =
2851  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
2852  kCOUNT,
2853  nullptr,
2854  false,
2855  nullptr);
2856  const auto count_all_exe_unit =
2857  create_count_all_execution_unit(work_unit.exe_unit, count);
2858  size_t one{1};
2859  ResultSetPtr count_all_result;
2860  try {
2861  ColumnCacheMap column_cache;
2862  count_all_result =
2863  executor_->executeWorkUnit(one,
2864  is_agg,
2865  get_table_infos(work_unit.exe_unit, executor_),
2866  count_all_exe_unit,
2867  co,
2868  eo,
2869  cat_,
2870  nullptr,
2871  false,
2872  column_cache);
2873  } catch (const std::exception& e) {
2874  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
2875  return std::nullopt;
2876  }
2877  const auto count_row = count_all_result->getNextRow(false, false);
2878  CHECK_EQ(size_t(1), count_row.size());
2879  const auto& count_tv = count_row.front();
2880  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
2881  CHECK(count_scalar_tv);
2882  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
2883  CHECK(count_ptr);
2884  CHECK_GE(*count_ptr, 0);
2885  auto count_upper_bound = static_cast<size_t>(*count_ptr);
2886  return std::max(count_upper_bound, size_t(1));
2887 }
bool is_agg(const Analyzer::Expr *expr)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
#define LOG(tag)
Definition: Logger.h:188
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::shared_ptr< ResultSet > ResultSetPtr
RelAlgExecutionUnit create_count_all_execution_unit(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< Analyzer::Expr > replacement_target)
const Catalog_Namespace::Catalog & cat_
bool g_bigint_count
Definition: sqldefs.h:76
#define CHECK(condition)
Definition: Logger.h:197
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqltypes.h:47
Executor * executor_
+ Here is the call graph for this function:

◆ getFilterSelectivity()

FilterSelectivity RelAlgExecutor::getFilterSelectivity ( const std::vector< std::shared_ptr< Analyzer::Expr >> &  filter_expressions,
const CompilationOptions co,
const ExecutionOptions eo 
)
private

Given a set of filter expressions for a table, it launches a new COUNT query to compute the number of passing rows, and then generates a set of statistics related to those filters. Later, these stats are used to decide whether a filter should be pushed down or not.

Definition at line 57 of file JoinFilterPushDown.cpp.

References CHECK, CHECK_EQ, Default, g_bigint_count, get_table_infos(), kBIGINT, kCOUNT, and kINT.

60  {
61  CollectInputColumnsVisitor input_columns_visitor;
62  std::list<std::shared_ptr<Analyzer::Expr>> quals;
63  std::unordered_set<InputColDescriptor> input_column_descriptors;
64  BindFilterToOutermostVisitor bind_filter_to_outermost;
65  for (const auto& filter_expr : filter_expressions) {
66  input_column_descriptors = input_columns_visitor.aggregateResult(
67  input_column_descriptors, input_columns_visitor.visit(filter_expr.get()));
68  quals.push_back(bind_filter_to_outermost.visit(filter_expr.get()));
69  }
70  std::vector<InputDescriptor> input_descs;
71  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
72  for (const auto& input_col_desc : input_column_descriptors) {
73  if (input_descs.empty()) {
74  input_descs.push_back(input_col_desc.getScanDesc());
75  } else {
76  CHECK(input_col_desc.getScanDesc() == input_descs.front());
77  }
78  input_col_descs.push_back(std::make_shared<const InputColDescriptor>(input_col_desc));
79  }
80  const auto count_expr =
81  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
82  kCOUNT,
83  nullptr,
84  false,
85  nullptr);
86  RelAlgExecutionUnit ra_exe_unit{input_descs,
87  input_col_descs,
88  {},
89  quals,
90  {},
91  {},
92  {count_expr.get()},
93  nullptr,
94  {{}, SortAlgorithm::Default, 0, 0},
95  0};
96  size_t one{1};
97  ResultSetPtr filtered_result;
98  const auto table_infos = get_table_infos(input_descs, executor_);
99  CHECK_EQ(size_t(1), table_infos.size());
100  const size_t total_rows_upper_bound = table_infos.front().info.getNumTuplesUpperBound();
101  try {
102  ColumnCacheMap column_cache;
103  filtered_result = executor_->executeWorkUnit(
104  one, true, table_infos, ra_exe_unit, co, eo, cat_, nullptr, false, column_cache);
105  } catch (...) {
106  return {false, 1.0, 0};
107  }
108  const auto count_row = filtered_result->getNextRow(false, false);
109  CHECK_EQ(size_t(1), count_row.size());
110  const auto& count_tv = count_row.front();
111  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
112  CHECK(count_scalar_tv);
113  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
114  CHECK(count_ptr);
115  const auto rows_passing = *count_ptr;
116  const auto rows_total = std::max(total_rows_upper_bound, size_t(1));
117  return {true, static_cast<float>(rows_passing) / rows_total, total_rows_upper_bound};
118 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
std::shared_ptr< ResultSet > ResultSetPtr
const Catalog_Namespace::Catalog & cat_
bool g_bigint_count
Definition: sqldefs.h:76
#define CHECK(condition)
Definition: Logger.h:197
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqltypes.h:47
Executor * executor_
+ Here is the call graph for this function:

◆ getNDVEstimation()

size_t RelAlgExecutor::getNDVEstimation ( const WorkUnit work_unit,
const int64_t  range,
const bool  is_agg,
const CompilationOptions co,
const ExecutionOptions eo 
)
private

Definition at line 47 of file CardinalityEstimator.cpp.

References create_ndv_execution_unit(), Executor::ERR_INTERRUPTED, Executor::ERR_OUT_OF_TIME, RelAlgExecutor::WorkUnit::exe_unit, get_table_infos(), QueryExecutionError::getErrorCode(), and UNREACHABLE.

51  {
52  const auto estimator_exe_unit = create_ndv_execution_unit(work_unit.exe_unit, range);
53  size_t one{1};
54  ColumnCacheMap column_cache;
55  try {
56  const auto estimator_result =
57  executor_->executeWorkUnit(one,
58  is_agg,
59  get_table_infos(work_unit.exe_unit, executor_),
60  estimator_exe_unit,
61  co,
62  eo,
63  cat_,
64  nullptr,
65  false,
66  column_cache);
67  if (!estimator_result) {
68  return 1;
69  }
70  return std::max(estimator_result->getNDVEstimator(), size_t(1));
71  } catch (const QueryExecutionError& e) {
73  throw std::runtime_error("Cardinality estimation query ran out of time");
74  }
76  throw std::runtime_error("Cardinality estimation query has been interrupted");
77  }
78  throw std::runtime_error("Failed to run the cardinality estimation query: " +
80  }
81  UNREACHABLE();
82  return 1;
83 }
bool is_agg(const Analyzer::Expr *expr)
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:989
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
#define UNREACHABLE()
Definition: Logger.h:241
const Catalog_Namespace::Catalog & cat_
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:988
RelAlgExecutionUnit create_ndv_execution_unit(const RelAlgExecutionUnit &ra_exe_unit, const int64_t range)
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
static std::string getErrorMessageFromCode(const int32_t error_code)
Executor * executor_
+ Here is the call graph for this function:

◆ getOuterFragmentCount()

size_t RelAlgExecutor::getOuterFragmentCount ( const CompilationOptions co,
const ExecutionOptions eo 
)

Definition at line 73 of file RelAlgExecutor.cpp.

References CHECK, Default, ResultSet::executor_, ExecutionOptions::find_push_down_candidates, get_frag_count_of_table(), anonymous_namespace{RelAlgExecutor.cpp}::get_physical_inputs(), get_physical_table_inputs(), ExecutionOptions::just_explain, and WindowProjectNodeContext::reset().

74  {
76  return 0;
77  }
78 
79  if (eo.just_explain) {
80  return 0;
81  }
82 
84 
85  query_dag_->resetQueryExecutionState();
86  const auto& ra = query_dag_->getRootNode();
87 
88  mapd_shared_lock<mapd_shared_mutex> lock(executor_->execute_mutex_);
89  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
90  const auto phys_inputs = get_physical_inputs(cat_, &ra);
91  const auto phys_table_ids = get_physical_table_inputs(&ra);
92  executor_->setCatalog(&cat_);
93  executor_->setupCaching(phys_inputs, phys_table_ids);
94 
95  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
96  auto ed_seq = RaExecutionSequence(&ra);
97 
98  if (!getSubqueries().empty()) {
99  return 0;
100  }
101 
102  CHECK(!ed_seq.empty());
103  if (ed_seq.size() > 1) {
104  return 0;
105  }
106 
107  decltype(temporary_tables_)().swap(temporary_tables_);
108  decltype(target_exprs_owned_)().swap(target_exprs_owned_);
109  executor_->catalog_ = &cat_;
110  executor_->temporary_tables_ = &temporary_tables_;
111 
113  auto exec_desc_ptr = ed_seq.getDescriptor(0);
114  CHECK(exec_desc_ptr);
115  auto& exec_desc = *exec_desc_ptr;
116  const auto body = exec_desc.getBody();
117  if (body->isNop()) {
118  return 0;
119  }
120 
121  const auto project = dynamic_cast<const RelProject*>(body);
122  if (project) {
123  auto work_unit =
124  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
125 
126  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
127  executor_);
128  }
129 
130  const auto compound = dynamic_cast<const RelCompound*>(body);
131  if (compound) {
132  if (compound->isDeleteViaSelect()) {
133  return 0;
134  } else if (compound->isUpdateViaSelect()) {
135  return 0;
136  } else {
137  if (compound->isAggregate()) {
138  return 0;
139  }
140 
141  const auto work_unit =
142  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
143 
144  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableId(),
145  executor_);
146  }
147  }
148 
149  return 0;
150 }
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
TemporaryTables temporary_tables_
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
const bool find_push_down_candidates
static void reset(Executor *executor)
A container for relational algebra descriptors defining the execution order for a relational algebra ...
const Catalog_Namespace::Catalog & cat_
std::unordered_set< PhysicalInput > get_physical_inputs(const Catalog_Namespace::Catalog &cat, const RelAlgNode *ra)
#define CHECK(condition)
Definition: Logger.h:197
std::unique_ptr< RelAlgDagBuilder > query_dag_
void cleanupPostExecution()
size_t get_frag_count_of_table(const int table_id, Executor *executor)
Executor * executor_
std::unordered_set< int > get_physical_table_inputs(const RelAlgNode *ra)
+ Here is the call graph for this function:

◆ getParsedQueryHints()

const QueryHint RelAlgExecutor::getParsedQueryHints ( ) const
inline

Definition at line 143 of file RelAlgExecutor.h.

References CHECK, and error_code.

Referenced by DBHandler::execute_rel_alg(), DBHandler::execute_rel_alg_df(), Parser::LocalConnector::getOuterFragmentCount(), QueryRunner::QueryRunner::getParsedQueryHintofQuery(), QueryRunner::anonymous_namespace{QueryRunner.cpp}::run_select_query_with_filter_push_down(), and QueryRunner::QueryRunner::runSelectQuery().

143  {
144  CHECK(query_dag_);
145  return query_dag_->getQueryHints();
146  }
#define CHECK(condition)
Definition: Logger.h:197
std::unique_ptr< RelAlgDagBuilder > query_dag_
+ Here is the caller graph for this function:

◆ getRootRelAlgNode()

const RelAlgNode& RelAlgExecutor::getRootRelAlgNode ( ) const
inline

Definition at line 135 of file RelAlgExecutor.h.

References CHECK.

135  {
136  CHECK(query_dag_);
137  return query_dag_->getRootNode();
138  }
#define CHECK(condition)
Definition: Logger.h:197
std::unique_ptr< RelAlgDagBuilder > query_dag_

◆ getSubqueries()

const std::vector<std::shared_ptr<RexSubQuery> >& RelAlgExecutor::getSubqueries ( ) const
inlinenoexcept

Definition at line 139 of file RelAlgExecutor.h.

References CHECK.

139  {
140  CHECK(query_dag_);
141  return query_dag_->getSubqueries();
142  };
#define CHECK(condition)
Definition: Logger.h:197
std::unique_ptr< RelAlgDagBuilder > query_dag_

◆ handleNop()

void RelAlgExecutor::handleNop ( RaExecutionDesc ed)
private

Definition at line 726 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, RaExecutionDesc::getBody(), and RaExecutionDesc::setResult().

726  {
727  // just set the result of the previous node as the result of no op
728  auto body = ed.getBody();
729  CHECK(dynamic_cast<const RelAggregate*>(body));
730  CHECK_EQ(size_t(1), body->inputCount());
731  const auto input = body->getInput(0);
732  body->setOutputMetainfo(input->getOutputMetainfo());
733  const auto it = temporary_tables_.find(-input->getId());
734  CHECK(it != temporary_tables_.end());
735  // set up temp table as it could be used by the outer query or next step
736  addTemporaryTable(-body->getId(), it->second);
737 
738  ed.setResult({it->second, input->getOutputMetainfo()});
739 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
TemporaryTables temporary_tables_
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
const RelAlgNode * getBody() const
#define CHECK(condition)
Definition: Logger.h:197
void setResult(const ExecutionResult &result)
+ Here is the call graph for this function:

◆ handleOutOfMemoryRetry()

ExecutionResult RelAlgExecutor::handleOutOfMemoryRetry ( const RelAlgExecutor::WorkUnit work_unit,
const std::vector< TargetMetaInfo > &  targets_meta,
const bool  is_agg,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const bool  was_multifrag_kernel_launch,
const int64_t  queue_time_ms 
)
private

Definition at line 2924 of file RelAlgExecutor.cpp.

References ExecutionOptions::allow_loop_joins, CHECK, anonymous_namespace{RelAlgExecutor.cpp}::decide_approx_count_distinct_implementation(), CompilationOptions::device_type, ExecutionOptions::dynamic_watchdog_time_limit, RelAlgExecutor::WorkUnit::exe_unit, ResultSet::executor_, ExecutionOptions::executor_type, g_enable_watchdog, get_table_infos(), QueryExecutionError::getErrorCode(), ExecutionOptions::gpu_input_mem_limit_percent, ExecutionOptions::jit_debug, LOG, CompilationOptions::makeCpuOnly(), RelAlgExecutor::WorkUnit::max_groups_buffer_entry_guess, ExecutionOptions::outer_fragment_indices, ExecutionOptions::output_columnar_hint, run_benchmark_import::result, ExecutionOptions::runtime_query_interrupt_frequency, RenderInfo::setForceNonInSituData(), RelAlgExecutionUnit::use_bump_allocator, VLOG, logger::WARNING, ExecutionOptions::with_dynamic_watchdog, and ExecutionOptions::with_watchdog.

2932  {
2933  // Disable the bump allocator
2934  // Note that this will have basically the same affect as using the bump allocator for
2935  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
2936  // path and the bump allocator path for kernel per fragment execution.
2937  auto ra_exe_unit_in = work_unit.exe_unit;
2938  ra_exe_unit_in.use_bump_allocator = false;
2939 
2940  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2941  co.device_type,
2943  nullptr,
2944  executor_),
2945  {}};
2946 
2947  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
2948  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2949  ExecutionOptions eo_no_multifrag{eo.output_columnar_hint,
2950  false,
2951  false,
2952  eo.allow_loop_joins,
2953  eo.with_watchdog,
2954  eo.jit_debug,
2955  false,
2958  false,
2959  false,
2961  false,
2963  eo.executor_type,
2965 
2966  if (was_multifrag_kernel_launch) {
2967  try {
2968  // Attempt to retry using the kernel per fragment path. The smaller input size
2969  // required may allow the entire kernel to execute in GPU memory.
2970  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
2971  "kernels disabled.";
2972  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
2973  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
2974  ColumnCacheMap column_cache;
2975  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
2976  is_agg,
2977  table_infos,
2978  ra_exe_unit,
2979  co,
2980  eo_no_multifrag,
2981  cat_,
2982  nullptr,
2983  true,
2984  column_cache),
2985  targets_meta};
2986  result.setQueueTime(queue_time_ms);
2987  } catch (const QueryExecutionError& e) {
2989  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
2990  }
2991  }
2992 
2993  if (render_info) {
2994  render_info->setForceNonInSituData();
2995  }
2996 
2997  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
2998  // Only reset the group buffer entry guess if we ran out of slots, which
2999  // suggests a
3000  // highly pathological input which prevented a good estimation of distinct tuple
3001  // count. For projection queries, this will force a per-fragment scan limit, which is
3002  // compatible with the CPU path
3003  VLOG(1) << "Resetting max groups buffer entry guess.";
3004  max_groups_buffer_entry_guess = 0;
3005 
3006  int iteration_ctr = -1;
3007  while (true) {
3008  iteration_ctr++;
3010  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
3011  ColumnCacheMap column_cache;
3012  try {
3013  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
3014  is_agg,
3015  table_infos,
3016  ra_exe_unit,
3017  co_cpu,
3018  eo_no_multifrag,
3019  cat_,
3020  nullptr,
3021  true,
3022  column_cache),
3023  targets_meta};
3024  } catch (const QueryExecutionError& e) {
3025  // Ran out of slots