OmniSciDB  d2f719934e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutor Class Reference

#include <RelAlgExecutor.h>

+ Inheritance diagram for RelAlgExecutor:
+ Collaboration diagram for RelAlgExecutor:

Classes

struct  TableFunctionWorkUnit
 
struct  WorkUnit
 

Public Types

using TargetInfoList = std::vector< TargetInfo >
 

Public Member Functions

 RelAlgExecutor (Executor *executor, const Catalog_Namespace::Catalog &cat, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
 RelAlgExecutor (Executor *executor, const Catalog_Namespace::Catalog &cat, const std::string &query_ra, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
 RelAlgExecutor (Executor *executor, const Catalog_Namespace::Catalog &cat, std::unique_ptr< RelAlgDagBuilder > query_dag, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
size_t getOuterFragmentCount (const CompilationOptions &co, const ExecutionOptions &eo)
 
ExecutionResult executeRelAlgQuery (const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
 
ExecutionResult executeRelAlgQueryWithFilterPushDown (const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
void prepareLeafExecution (const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
 
ExecutionResult executeRelAlgSeq (const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
 
ExecutionResult executeRelAlgSubSeq (const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
QueryStepExecutionResult executeRelAlgQuerySingleStep (const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
 
void addLeafResult (const unsigned id, const AggregatedResult &result)
 
const RelAlgNodegetRootRelAlgNode () const
 
std::shared_ptr< const RelAlgNodegetRootRelAlgNodeShPtr () const
 
std::pair< std::vector
< unsigned >
, std::unordered_map< unsigned,
JoinQualsPerNestingLevel > > 
getJoinInfo (const RelAlgNode *root_node)
 
std::shared_ptr< RelAlgTranslatorgetRelAlgTranslator (const RelAlgNode *root_node)
 
const std::vector
< std::shared_ptr< RexSubQuery > > & 
getSubqueries () const noexcept
 
std::optional
< RegisteredQueryHint
getParsedQueryHint (const RelAlgNode *node)
 
std::optional
< std::unordered_map< size_t,
std::unordered_map< unsigned,
RegisteredQueryHint > > > 
getParsedQueryHints ()
 
std::optional
< RegisteredQueryHint
getGlobalQueryHint ()
 
ExecutionResult executeSimpleInsert (const Analyzer::Query &insert_query)
 
AggregatedColRange computeColRangesCache ()
 
StringDictionaryGenerations computeStringDictionaryGenerations ()
 
TableGenerations computeTableGenerations ()
 
ExecutorgetExecutor () const
 
void cleanupPostExecution ()
 
void executePostExecutionCallback ()
 

Static Public Member Functions

static std::string getErrorMessageFromCode (const int32_t error_code)
 

Private Member Functions

ExecutionResult executeRelAlgQueryNoRetry (const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
 
void executeRelAlgStep (const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
void executeUpdate (const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
 
void executeDelete (const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
 
ExecutionResult executeCompound (const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeAggregate (const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
ExecutionResult executeProject (const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
 
ExecutionResult executeTableFunction (const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
 
void computeWindow (const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
 
std::unique_ptr
< WindowFunctionContext
createWindowFunctionContext (const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
 
ExecutionResult executeFilter (const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeSort (const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeLogicalValues (const RelLogicalValues *, const ExecutionOptions &)
 
ExecutionResult executeModify (const RelModify *modify, const ExecutionOptions &eo)
 
ExecutionResult executeUnion (const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
WorkUnit createSortInputWorkUnit (const RelSort *, const ExecutionOptions &eo)
 
ExecutionResult executeWorkUnit (const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
 
size_t getNDVEstimation (const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
 
std::optional< size_t > getFilteredCountAll (const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
 
FilterSelectivity getFilterSelectivity (const std::vector< std::shared_ptr< Analyzer::Expr >> &filter_expressions, const CompilationOptions &co, const ExecutionOptions &eo)
 
std::vector< PushedDownFilterInfoselectFiltersToBePushedDown (const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
 
bool isRowidLookup (const WorkUnit &work_unit)
 
ExecutionResult handleOutOfMemoryRetry (const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
 
WorkUnit createWorkUnit (const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createCompoundWorkUnit (const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createAggregateWorkUnit (const RelAggregate *, const SortInfo &, const bool just_explain)
 
WorkUnit createProjectWorkUnit (const RelProject *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createFilterWorkUnit (const RelFilter *, const SortInfo &, const bool just_explain)
 
WorkUnit createJoinWorkUnit (const RelJoin *, const SortInfo &, const bool just_explain)
 
WorkUnit createUnionWorkUnit (const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
 
TableFunctionWorkUnit createTableFunctionWorkUnit (const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
 
void addTemporaryTable (const int table_id, const ResultSetPtr &result)
 
void eraseFromTemporaryTables (const int table_id)
 
void handleNop (RaExecutionDesc &ed)
 
std::unordered_map< unsigned,
JoinQualsPerNestingLevel > & 
getLeftDeepJoinTreesInfo ()
 
JoinQualsPerNestingLevel translateLeftDeepJoinFilter (const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
 
std::list< std::shared_ptr
< Analyzer::Expr > > 
makeJoinQuals (const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
 
- Private Member Functions inherited from StorageIOFacility
 StorageIOFacility (Executor *executor, Catalog_Namespace::Catalog const &catalog)
 
StorageIOFacility::UpdateCallback yieldUpdateCallback (UpdateTransactionParameters &update_parameters)
 
StorageIOFacility::UpdateCallback yieldDeleteCallback (DeleteTransactionParameters &delete_parameters)
 

Static Private Member Functions

static void handlePersistentError (const int32_t error_code)
 

Private Attributes

Executorexecutor_
 
const Catalog_Namespace::Catalogcat_
 
std::unique_ptr< RelAlgDagBuilderquery_dag_
 
std::shared_ptr< const
query_state::QueryState
query_state_
 
TemporaryTables temporary_tables_
 
time_t now_
 
std::unordered_map< unsigned,
JoinQualsPerNestingLevel
left_deep_join_info_
 
std::vector< std::shared_ptr
< Analyzer::Expr > > 
target_exprs_owned_
 
std::unordered_map< unsigned,
AggregatedResult
leaf_results_
 
int64_t queue_time_ms_
 
std::unique_ptr
< TransactionParameters
dml_transaction_parameters_
 
std::optional< std::function
< void()> > 
post_execution_callback_
 

Static Private Attributes

static SpeculativeTopNBlacklist speculative_topn_blacklist_
 

Friends

class PendingExecutionClosure
 

Additional Inherited Members

- Private Types inherited from StorageIOFacility
using UpdateCallback = UpdateLogForFragment::Callback
 
using TableDescriptorType = TableDescriptor
 
using DeleteVictimOffsetList = std::vector< uint64_t >
 
using UpdateTargetOffsetList = std::vector< uint64_t >
 
using UpdateTargetTypeList = std::vector< TargetMetaInfo >
 
using UpdateTargetColumnNamesList = std::vector< std::string >
 
using TransactionLog = Fragmenter_Namespace::InsertOrderFragmenter::ModifyTransactionTracker
 
using TransactionLogPtr = std::unique_ptr< TransactionLog >
 
using ColumnValidationFunction = std::function< bool(std::string const &)>
 

Detailed Description

Definition at line 48 of file RelAlgExecutor.h.

Member Typedef Documentation

Definition at line 50 of file RelAlgExecutor.h.

Constructor & Destructor Documentation

RelAlgExecutor::RelAlgExecutor ( Executor executor,
const Catalog_Namespace::Catalog cat,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 52 of file RelAlgExecutor.h.

55  : StorageIOFacility(executor, cat)
56  , executor_(executor)
57  , cat_(cat)
58  , query_state_(std::move(query_state))
59  , now_(0)
60  , queue_time_ms_(0) {}
int64_t queue_time_ms_
StorageIOFacility(Executor *executor, Catalog_Namespace::Catalog const &catalog)
const Catalog_Namespace::Catalog & cat_
std::shared_ptr< const query_state::QueryState > query_state_
Executor * executor_
RelAlgExecutor::RelAlgExecutor ( Executor executor,
const Catalog_Namespace::Catalog cat,
const std::string &  query_ra,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 62 of file RelAlgExecutor.h.

66  : StorageIOFacility(executor, cat)
67  , executor_(executor)
68  , cat_(cat)
69  , query_dag_(std::make_unique<RelAlgDagBuilder>(query_ra, cat_, nullptr))
70  , query_state_(std::move(query_state))
71  , now_(0)
72  , queue_time_ms_(0) {}
int64_t queue_time_ms_
StorageIOFacility(Executor *executor, Catalog_Namespace::Catalog const &catalog)
const Catalog_Namespace::Catalog & cat_
std::shared_ptr< const query_state::QueryState > query_state_
std::unique_ptr< RelAlgDagBuilder > query_dag_
Executor * executor_
RelAlgExecutor::RelAlgExecutor ( Executor executor,
const Catalog_Namespace::Catalog cat,
std::unique_ptr< RelAlgDagBuilder query_dag,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 74 of file RelAlgExecutor.h.

78  : StorageIOFacility(executor, cat)
79  , executor_(executor)
80  , cat_(cat)
81  , query_dag_(std::move(query_dag))
82  , query_state_(std::move(query_state))
83  , now_(0)
84  , queue_time_ms_(0) {}
int64_t queue_time_ms_
StorageIOFacility(Executor *executor, Catalog_Namespace::Catalog const &catalog)
const Catalog_Namespace::Catalog & cat_
std::shared_ptr< const query_state::QueryState > query_state_
std::unique_ptr< RelAlgDagBuilder > query_dag_
Executor * executor_

Member Function Documentation

void RelAlgExecutor::addLeafResult ( const unsigned  id,
const AggregatedResult result 
)
inline

Definition at line 124 of file RelAlgExecutor.h.

References CHECK, and leaf_results_.

124  {
125  const auto it_ok = leaf_results_.emplace(id, result);
126  CHECK(it_ok.second);
127  }
std::unordered_map< unsigned, AggregatedResult > leaf_results_
#define CHECK(condition)
Definition: Logger.h:211
void RelAlgExecutor::addTemporaryTable ( const int  table_id,
const ResultSetPtr result 
)
inlineprivate

Definition at line 356 of file RelAlgExecutor.h.

References CHECK, CHECK_LT, and temporary_tables_.

Referenced by executeRelAlgStep(), and handleNop().

356  {
357  CHECK_LT(size_t(0), result->colCount());
358  CHECK_LT(table_id, 0);
359  const auto it_ok = temporary_tables_.emplace(table_id, result);
360  CHECK(it_ok.second);
361  }
TemporaryTables temporary_tables_
#define CHECK_LT(x, y)
Definition: Logger.h:221
#define CHECK(condition)
Definition: Logger.h:211

+ Here is the caller graph for this function:

void RelAlgExecutor::cleanupPostExecution ( )

Definition at line 529 of file RelAlgExecutor.cpp.

References CHECK, and executor_.

Referenced by executeRelAlgQueryNoRetry(), and getOuterFragmentCount().

529  {
530  CHECK(executor_);
531  executor_->row_set_mem_owner_ = nullptr;
532 }
#define CHECK(condition)
Definition: Logger.h:211
Executor * executor_

+ Here is the caller graph for this function:

AggregatedColRange RelAlgExecutor::computeColRangesCache ( )

Definition at line 509 of file RelAlgExecutor.cpp.

References cat_, executor_, get_physical_inputs(), and getRootRelAlgNode().

509  {
510  AggregatedColRange agg_col_range_cache;
511  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
512  return executor_->computeColRangesCache(phys_inputs);
513 }
const Catalog_Namespace::Catalog & cat_
const RelAlgNode & getRootRelAlgNode() const
std::unordered_set< PhysicalInput > get_physical_inputs(const RelAlgNode *ra)
Executor * executor_

+ Here is the call graph for this function:

StringDictionaryGenerations RelAlgExecutor::computeStringDictionaryGenerations ( )

Definition at line 515 of file RelAlgExecutor.cpp.

References cat_, executor_, get_physical_inputs(), and getRootRelAlgNode().

515  {
516  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
517  return executor_->computeStringDictionaryGenerations(phys_inputs);
518 }
const Catalog_Namespace::Catalog & cat_
const RelAlgNode & getRootRelAlgNode() const
std::unordered_set< PhysicalInput > get_physical_inputs(const RelAlgNode *ra)
Executor * executor_

+ Here is the call graph for this function:

TableGenerations RelAlgExecutor::computeTableGenerations ( )

Definition at line 520 of file RelAlgExecutor.cpp.

References executor_, get_physical_table_inputs(), and getRootRelAlgNode().

520  {
521  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
522  return executor_->computeTableGenerations(phys_table_ids);
523 }
const RelAlgNode & getRootRelAlgNode() const
Executor * executor_
std::unordered_set< int > get_physical_table_inputs(const RelAlgNode *ra)

+ Here is the call graph for this function:

void RelAlgExecutor::computeWindow ( const RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co,
const ExecutionOptions eo,
ColumnCacheMap column_cache_map,
const int64_t  queue_time_ms 
)
private

Definition at line 2016 of file RelAlgExecutor.cpp.

References CHECK_EQ, WindowProjectNodeContext::create(), createWindowFunctionContext(), executor_, ExecutionOptions::executor_type, Extern, get_table_infos(), Analyzer::WindowFunction::getPartitionKeys(), RelAlgExecutionUnit::input_descs, kBOOLEAN, kBW_EQ, kONE, RelAlgExecutionUnit::target_exprs, and anonymous_namespace{RelAlgExecutor.cpp}::transform_to_inner().

Referenced by executeWorkUnit().

2020  {
2021  auto query_infos = get_table_infos(ra_exe_unit.input_descs, executor_);
2022  CHECK_EQ(query_infos.size(), size_t(1));
2023  if (query_infos.front().info.fragments.size() != 1) {
2024  throw std::runtime_error(
2025  "Only single fragment tables supported for window functions for now");
2026  }
2027  if (eo.executor_type == ::ExecutorType::Extern) {
2028  return;
2029  }
2030  query_infos.push_back(query_infos.front());
2031  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
2032  for (size_t target_index = 0; target_index < ra_exe_unit.target_exprs.size();
2033  ++target_index) {
2034  const auto& target_expr = ra_exe_unit.target_exprs[target_index];
2035  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
2036  if (!window_func) {
2037  continue;
2038  }
2039  // Always use baseline layout hash tables for now, make the expression a tuple.
2040  const auto& partition_keys = window_func->getPartitionKeys();
2041  std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2042  if (partition_keys.size() >= 1) {
2043  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2044  if (partition_keys.size() > 1) {
2045  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2046  } else {
2047  CHECK_EQ(partition_keys.size(), size_t(1));
2048  partition_key_tuple = partition_keys.front();
2049  }
2050  // Creates a tautology equality with the partition expression on both sides.
2051  partition_key_cond =
2052  makeExpr<Analyzer::BinOper>(kBOOLEAN,
2053  kBW_EQ,
2054  kONE,
2055  partition_key_tuple,
2056  transform_to_inner(partition_key_tuple.get()));
2057  }
2058  auto context =
2059  createWindowFunctionContext(window_func,
2060  partition_key_cond /*nullptr if no partition key*/,
2061  ra_exe_unit,
2062  query_infos,
2063  co,
2064  column_cache_map,
2065  executor_->getRowSetMemoryOwner());
2066  context->compute();
2067  window_project_node_context->addWindowFunctionContext(std::move(context),
2068  target_index);
2069  }
2070 }
std::vector< Analyzer::Expr * > target_exprs
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::vector< InputDescriptor > input_descs
static WindowProjectNodeContext * create(Executor *executor)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
ExecutorType executor_type
Definition: sqldefs.h:69
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqldefs.h:31
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:1611
Executor * executor_
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createAggregateWorkUnit ( const RelAggregate aggregate,
const SortInfo sort_info,
const bool  just_explain 
)
private

Definition at line 4028 of file RelAlgExecutor.cpp.

References cat_, CHECK_EQ, RegisteredQueryHint::defaults(), executor_, QueryPlanDagExtractor::extractQueryPlanDag(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), getLeftDeepJoinTreesInfo(), RelAlgNode::getOutputMetainfo(), RelAlgNode::inputCount(), now_, query_dag_, query_state_, RelAlgNode::setOutputMetainfo(), anonymous_namespace{RelAlgExecutor.cpp}::synthesize_inputs(), target_exprs_owned_, temporary_tables_, anonymous_namespace{RelAlgExecutor.cpp}::translate_groupby_exprs(), and anonymous_namespace{RelAlgExecutor.cpp}::translate_targets().

Referenced by createWorkUnit(), and executeAggregate().

4031  {
4032  std::vector<InputDescriptor> input_descs;
4033  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4034  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4035  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
4036  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4037  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
4038  const auto join_type = get_join_type(aggregate);
4039 
4040  RelAlgTranslator translator(cat_,
4041  query_state_,
4042  executor_,
4043  input_to_nest_level,
4044  {join_type},
4045  now_,
4046  just_explain);
4047  CHECK_EQ(size_t(1), aggregate->inputCount());
4048  const auto source = aggregate->getInput(0);
4049  const auto& in_metainfo = source->getOutputMetainfo();
4050  const auto scalar_sources =
4051  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
4052  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
4053  const auto target_exprs = translate_targets(
4054  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
4055  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
4056  aggregate->setOutputMetainfo(targets_meta);
4057  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(aggregate,
4058  cat_,
4059  std::nullopt,
4062  executor_,
4063  translator);
4064  auto query_hint = RegisteredQueryHint::defaults();
4065  if (query_dag_) {
4066  auto candidate = query_dag_->getQueryHint(aggregate);
4067  if (candidate) {
4068  query_hint = *candidate;
4069  }
4070  }
4071  return {RelAlgExecutionUnit{input_descs,
4072  input_col_descs,
4073  {},
4074  {},
4075  {},
4076  groupby_exprs,
4077  target_exprs,
4078  nullptr,
4079  sort_info,
4080  0,
4081  query_hint,
4082  dag_info.extracted_dag,
4083  dag_info.hash_table_plan_dag,
4084  dag_info.table_id_to_node_map,
4085  false,
4086  std::nullopt,
4087  query_state_},
4088  aggregate,
4090  nullptr};
4091 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
static ExtractedPlanDag extractQueryPlanDag(const RelAlgNode *node, const Catalog_Namespace::Catalog &catalog, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > &left_deep_tree_infos, const TemporaryTables &temporary_tables, Executor *executor, const RelAlgTranslator &rel_alg_translator)
TemporaryTables temporary_tables_
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const Catalog_Namespace::Catalog & cat_
const RelAlgNode * getInput(const size_t idx) const
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
static RegisteredQueryHint defaults()
Definition: QueryHint.h:222
std::shared_ptr< const query_state::QueryState > query_state_
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:105
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
std::unique_ptr< RelAlgDagBuilder > query_dag_
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const size_t inputCount() const
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createCompoundWorkUnit ( const RelCompound compound,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 3703 of file RelAlgExecutor.cpp.

References cat_, CHECK_EQ, RegisteredQueryHint::defaults(), anonymous_namespace{RelAlgExecutor.cpp}::do_table_reordering(), EMPTY_QUERY_PLAN, executor_, ExecutionOptions::executor_type, QueryPlanDagExtractor::extractQueryPlanDag(), g_default_max_groups_buffer_entry_guess, g_from_table_reordering, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_left_deep_join_input_sizes(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), getLeftDeepJoinTreesInfo(), RelAlgNode::inputCount(), anonymous_namespace{RelAlgExecutor.cpp}::is_extracted_dag_valid(), ExecutionOptions::just_explain, LEFT, anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), now_, shared::printContainer(), query_dag_, query_state_, anonymous_namespace{RelAlgExecutor.cpp}::rewrite_quals(), RelAlgNode::setOutputMetainfo(), RelAlgExecutionUnit::simple_quals, RelCompound::size(), target_exprs_owned_, temporary_tables_, anonymous_namespace{RelAlgExecutor.cpp}::translate_groupby_exprs(), anonymous_namespace{RelAlgExecutor.cpp}::translate_quals(), anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), anonymous_namespace{RelAlgExecutor.cpp}::translate_targets(), translateLeftDeepJoinFilter(), and VLOG.

Referenced by createWorkUnit(), executeCompound(), executeDelete(), executeUpdate(), and getOuterFragmentCount().

3706  {
3707  std::vector<InputDescriptor> input_descs;
3708  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3709  auto input_to_nest_level = get_input_nest_levels(compound, {});
3710  std::tie(input_descs, input_col_descs, std::ignore) =
3711  get_input_desc(compound, input_to_nest_level, {}, cat_);
3712  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
3713  const auto query_infos = get_table_infos(input_descs, executor_);
3714  CHECK_EQ(size_t(1), compound->inputCount());
3715  const auto left_deep_join =
3716  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
3717  JoinQualsPerNestingLevel left_deep_join_quals;
3718  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3719  : std::vector<JoinType>{get_join_type(compound)};
3720  std::vector<size_t> input_permutation;
3721  std::vector<size_t> left_deep_join_input_sizes;
3722  std::optional<unsigned> left_deep_tree_id;
3723  if (left_deep_join) {
3724  left_deep_tree_id = left_deep_join->getId();
3725  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3726  left_deep_join_quals = translateLeftDeepJoinFilter(
3727  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3729  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
3730  join_types.end()) {
3731  input_permutation = do_table_reordering(input_descs,
3732  input_col_descs,
3733  left_deep_join_quals,
3734  input_to_nest_level,
3735  compound,
3736  query_infos,
3737  executor_);
3738  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
3739  std::tie(input_descs, input_col_descs, std::ignore) =
3740  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
3741  left_deep_join_quals = translateLeftDeepJoinFilter(
3742  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3743  }
3744  }
3745  RelAlgTranslator translator(cat_,
3746  query_state_,
3747  executor_,
3748  input_to_nest_level,
3749  join_types,
3750  now_,
3751  eo.just_explain);
3752  const auto scalar_sources =
3753  translate_scalar_sources(compound, translator, eo.executor_type);
3754  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
3755  const auto quals_cf = translate_quals(compound, translator);
3756  const auto target_exprs = translate_targets(target_exprs_owned_,
3757  scalar_sources,
3758  groupby_exprs,
3759  compound,
3760  translator,
3761  eo.executor_type);
3762  auto query_hint = RegisteredQueryHint::defaults();
3763  if (query_dag_) {
3764  auto candidate = query_dag_->getQueryHint(compound);
3765  if (candidate) {
3766  query_hint = *candidate;
3767  }
3768  }
3769  CHECK_EQ(compound->size(), target_exprs.size());
3770  const RelAlgExecutionUnit exe_unit = {input_descs,
3771  input_col_descs,
3772  quals_cf.simple_quals,
3773  rewrite_quals(quals_cf.quals),
3774  left_deep_join_quals,
3775  groupby_exprs,
3776  target_exprs,
3777  nullptr,
3778  sort_info,
3779  0,
3780  query_hint,
3782  {},
3783  {},
3784  false,
3785  std::nullopt,
3786  query_state_};
3787  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3788  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3789  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
3790  compound->setOutputMetainfo(targets_meta);
3791  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
3792  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
3793  left_deep_trees_info.emplace(left_deep_tree_id.value(),
3794  rewritten_exe_unit.join_quals);
3795  }
3796  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(compound,
3797  cat_,
3798  left_deep_tree_id,
3799  left_deep_trees_info,
3801  executor_,
3802  translator);
3803  if (is_extracted_dag_valid(dag_info)) {
3804  rewritten_exe_unit.query_plan_dag = dag_info.extracted_dag;
3805  rewritten_exe_unit.hash_table_build_plan_dag = dag_info.hash_table_plan_dag;
3806  rewritten_exe_unit.table_id_to_node_map = dag_info.table_id_to_node_map;
3807  }
3808  return {rewritten_exe_unit,
3809  compound,
3811  std::move(query_rewriter),
3812  input_permutation,
3813  left_deep_join_input_sizes};
3814 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:108
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
static ExtractedPlanDag extractQueryPlanDag(const RelAlgNode *node, const Catalog_Namespace::Catalog &catalog, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > &left_deep_tree_infos, const TemporaryTables &temporary_tables, Executor *executor, const RelAlgTranslator &rel_alg_translator)
size_t size() const override
TemporaryTables temporary_tables_
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
bool g_from_table_reordering
Definition: Execute.cpp:86
ExecutorType executor_type
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const Catalog_Namespace::Catalog & cat_
bool is_extracted_dag_valid(ExtractedPlanDag &dag)
const RelAlgNode * getInput(const size_t idx) const
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
static RegisteredQueryHint defaults()
Definition: QueryHint.h:222
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:105
constexpr char const * EMPTY_QUERY_PLAN
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
std::unique_ptr< RelAlgDagBuilder > query_dag_
QualsConjunctiveForm translate_quals(const RelCompound *compound, const RelAlgTranslator &translator)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:104
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const size_t inputCount() const
Executor * executor_
#define VLOG(n)
Definition: Logger.h:305
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)
std::list< std::shared_ptr< Analyzer::Expr > > rewrite_quals(const std::list< std::shared_ptr< Analyzer::Expr >> &quals)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createFilterWorkUnit ( const RelFilter filter,
const SortInfo sort_info,
const bool  just_explain 
)
private

Definition at line 4527 of file RelAlgExecutor.cpp.

References cat_, CHECK_EQ, RegisteredQueryHint::defaults(), executor_, QueryPlanDagExtractor::extractQueryPlanDag(), fold_expr(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_inputs_meta(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), RelFilter::getCondition(), getLeftDeepJoinTreesInfo(), RelAlgNode::inputCount(), now_, query_dag_, query_state_, rewrite_expr(), RelAlgNode::setOutputMetainfo(), target_exprs_owned_, and temporary_tables_.

Referenced by createWorkUnit(), and executeFilter().

4529  {
4530  CHECK_EQ(size_t(1), filter->inputCount());
4531  std::vector<InputDescriptor> input_descs;
4532  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4533  std::vector<TargetMetaInfo> in_metainfo;
4534  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4535  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
4536 
4537  const auto input_to_nest_level = get_input_nest_levels(filter, {});
4538  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4539  get_input_desc(filter, input_to_nest_level, {}, cat_);
4540  const auto join_type = get_join_type(filter);
4541  RelAlgTranslator translator(cat_,
4542  query_state_,
4543  executor_,
4544  input_to_nest_level,
4545  {join_type},
4546  now_,
4547  just_explain);
4548  std::tie(in_metainfo, target_exprs_owned) =
4549  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
4550  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
4551  const auto qual = fold_expr(filter_expr.get());
4552  target_exprs_owned_.insert(
4553  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4554  const auto target_exprs = get_raw_pointers(target_exprs_owned);
4555  filter->setOutputMetainfo(in_metainfo);
4556  const auto rewritten_qual = rewrite_expr(qual.get());
4557  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(filter,
4558  cat_,
4559  std::nullopt,
4562  executor_,
4563  translator);
4564  auto query_hint = RegisteredQueryHint::defaults();
4565  if (query_dag_) {
4566  auto candidate = query_dag_->getQueryHint(filter);
4567  if (candidate) {
4568  query_hint = *candidate;
4569  }
4570  }
4571  return {{input_descs,
4572  input_col_descs,
4573  {},
4574  {rewritten_qual ? rewritten_qual : qual},
4575  {},
4576  {nullptr},
4577  target_exprs,
4578  nullptr,
4579  sort_info,
4580  0,
4581  query_hint,
4582  dag_info.extracted_dag,
4583  dag_info.hash_table_plan_dag,
4584  dag_info.table_id_to_node_map},
4585  filter,
4587  nullptr};
4588 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
static ExtractedPlanDag extractQueryPlanDag(const RelAlgNode *node, const Catalog_Namespace::Catalog &catalog, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > &left_deep_tree_infos, const TemporaryTables &temporary_tables, Executor *executor, const RelAlgTranslator &rel_alg_translator)
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
TemporaryTables temporary_tables_
const RexScalar * getCondition() const
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
const Catalog_Namespace::Catalog & cat_
JoinType get_join_type(const RelAlgNode *ra)
static RegisteredQueryHint defaults()
Definition: QueryHint.h:222
std::shared_ptr< const query_state::QueryState > query_state_
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:105
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
std::unique_ptr< RelAlgDagBuilder > query_dag_
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const size_t inputCount() const
Executor * executor_
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

WorkUnit RelAlgExecutor::createJoinWorkUnit ( const RelJoin ,
const SortInfo ,
const bool  just_explain 
)
private
RelAlgExecutor::WorkUnit RelAlgExecutor::createProjectWorkUnit ( const RelProject project,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4093 of file RelAlgExecutor.cpp.

References cat_, RegisteredQueryHint::defaults(), anonymous_namespace{RelAlgExecutor.cpp}::do_table_reordering(), EMPTY_QUERY_PLAN, executor_, ExecutionOptions::executor_type, QueryPlanDagExtractor::extractQueryPlanDag(), g_default_max_groups_buffer_entry_guess, g_from_table_reordering, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_left_deep_join_input_sizes(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), getLeftDeepJoinTreesInfo(), anonymous_namespace{RelAlgExecutor.cpp}::is_extracted_dag_valid(), ExecutionOptions::just_explain, anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), now_, query_dag_, query_state_, RelAlgNode::setOutputMetainfo(), target_exprs_owned_, temporary_tables_, anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), and translateLeftDeepJoinFilter().

Referenced by createWorkUnit(), executeDelete(), executeProject(), executeUpdate(), and getOuterFragmentCount().

4096  {
4097  std::vector<InputDescriptor> input_descs;
4098  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4099  auto input_to_nest_level = get_input_nest_levels(project, {});
4100  std::tie(input_descs, input_col_descs, std::ignore) =
4101  get_input_desc(project, input_to_nest_level, {}, cat_);
4102  const auto query_infos = get_table_infos(input_descs, executor_);
4103 
4104  const auto left_deep_join =
4105  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
4106  JoinQualsPerNestingLevel left_deep_join_quals;
4107  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4108  : std::vector<JoinType>{get_join_type(project)};
4109  std::vector<size_t> input_permutation;
4110  std::vector<size_t> left_deep_join_input_sizes;
4111  std::optional<unsigned> left_deep_tree_id;
4112  if (left_deep_join) {
4113  left_deep_tree_id = left_deep_join->getId();
4114  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4115  const auto query_infos = get_table_infos(input_descs, executor_);
4116  left_deep_join_quals = translateLeftDeepJoinFilter(
4117  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4119  input_permutation = do_table_reordering(input_descs,
4120  input_col_descs,
4121  left_deep_join_quals,
4122  input_to_nest_level,
4123  project,
4124  query_infos,
4125  executor_);
4126  input_to_nest_level = get_input_nest_levels(project, input_permutation);
4127  std::tie(input_descs, input_col_descs, std::ignore) =
4128  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
4129  left_deep_join_quals = translateLeftDeepJoinFilter(
4130  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4131  }
4132  }
4133 
4134  RelAlgTranslator translator(cat_,
4135  query_state_,
4136  executor_,
4137  input_to_nest_level,
4138  join_types,
4139  now_,
4140  eo.just_explain);
4141  const auto target_exprs_owned =
4142  translate_scalar_sources(project, translator, eo.executor_type);
4143  target_exprs_owned_.insert(
4144  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4145  const auto target_exprs = get_raw_pointers(target_exprs_owned);
4146  auto query_hint = RegisteredQueryHint::defaults();
4147  if (query_dag_) {
4148  auto candidate = query_dag_->getQueryHint(project);
4149  if (candidate) {
4150  query_hint = *candidate;
4151  }
4152  }
4153  const RelAlgExecutionUnit exe_unit = {input_descs,
4154  input_col_descs,
4155  {},
4156  {},
4157  left_deep_join_quals,
4158  {nullptr},
4159  target_exprs,
4160  nullptr,
4161  sort_info,
4162  0,
4163  query_hint,
4165  {},
4166  {},
4167  false,
4168  std::nullopt,
4169  query_state_};
4170  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4171  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4172  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
4173  project->setOutputMetainfo(targets_meta);
4174  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4175  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4176  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4177  rewritten_exe_unit.join_quals);
4178  }
4179  auto dag_info = QueryPlanDagExtractor::extractQueryPlanDag(project,
4180  cat_,
4181  left_deep_tree_id,
4182  left_deep_trees_info,
4184  executor_,
4185  translator);
4186  if (is_extracted_dag_valid(dag_info)) {
4187  rewritten_exe_unit.query_plan_dag = dag_info.extracted_dag;
4188  rewritten_exe_unit.hash_table_build_plan_dag = dag_info.hash_table_plan_dag;
4189  rewritten_exe_unit.table_id_to_node_map = dag_info.table_id_to_node_map;
4190  }
4191  return {rewritten_exe_unit,
4192  project,
4194  std::move(query_rewriter),
4195  input_permutation,
4196  left_deep_join_input_sizes};
4197 }
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:108
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
static ExtractedPlanDag extractQueryPlanDag(const RelAlgNode *node, const Catalog_Namespace::Catalog &catalog, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > &left_deep_tree_infos, const TemporaryTables &temporary_tables, Executor *executor, const RelAlgTranslator &rel_alg_translator)
TemporaryTables temporary_tables_
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
bool g_from_table_reordering
Definition: Execute.cpp:86
ExecutorType executor_type
const Catalog_Namespace::Catalog & cat_
bool is_extracted_dag_valid(ExtractedPlanDag &dag)
const RelAlgNode * getInput(const size_t idx) const
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
static RegisteredQueryHint defaults()
Definition: QueryHint.h:222
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:105
constexpr char const * EMPTY_QUERY_PLAN
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
std::unique_ptr< RelAlgDagBuilder > query_dag_
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
Executor * executor_
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createSortInputWorkUnit ( const RelSort sort,
const ExecutionOptions eo 
)
private

Definition at line 2806 of file RelAlgExecutor.cpp.

References CHECK_GT, RelSort::collationCount(), SpeculativeTopNBlacklist::contains(), createWorkUnit(), Default, anonymous_namespace{RelAlgExecutor.cpp}::first_oe_is_desc(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_order_entries(), anonymous_namespace{RelAlgExecutor.cpp}::get_scan_limit(), get_target_info(), RelAlgNode::getInput(), RelSort::getLimit(), RelSort::getOffset(), RelAlgExecutionUnit::input_descs, RelAlgNode::setOutputMetainfo(), speculative_topn_blacklist_, SpeculativeTopN, and StreamingTopN.

Referenced by executeRelAlgQuerySingleStep(), and executeSort().

2808  {
2809  const auto source = sort->getInput(0);
2810  const size_t limit = sort->getLimit();
2811  const size_t offset = sort->getOffset();
2812  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
2813  const size_t scan_total_limit =
2814  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
2815  size_t max_groups_buffer_entry_guess{
2816  scan_total_limit ? scan_total_limit : g_default_max_groups_buffer_entry_guess};
2818  const auto order_entries = get_order_entries(sort);
2819  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
2820  auto source_work_unit = createWorkUnit(source, sort_info, eo);
2821  const auto& source_exe_unit = source_work_unit.exe_unit;
2822 
2823  // we do not allow sorting geometry or array types
2824  for (auto order_entry : order_entries) {
2825  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
2826  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
2827  const auto& ti = get_target_info(te, false);
2828  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
2829  throw std::runtime_error(
2830  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
2831  }
2832  }
2833 
2834  if (source_exe_unit.groupby_exprs.size() == 1) {
2835  if (!source_exe_unit.groupby_exprs.front()) {
2836  sort_algorithm = SortAlgorithm::StreamingTopN;
2837  } else {
2838  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
2839  first_oe_is_desc(order_entries))) {
2840  sort_algorithm = SortAlgorithm::Default;
2841  }
2842  }
2843  }
2844 
2845  sort->setOutputMetainfo(source->getOutputMetainfo());
2846  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
2847  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
2848  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
2849  std::move(source_exe_unit.input_col_descs),
2850  source_exe_unit.simple_quals,
2851  source_exe_unit.quals,
2852  source_exe_unit.join_quals,
2853  source_exe_unit.groupby_exprs,
2854  source_exe_unit.target_exprs,
2855  nullptr,
2856  {sort_info.order_entries, sort_algorithm, limit, offset},
2857  scan_total_limit,
2858  source_exe_unit.query_hint,
2859  source_exe_unit.query_plan_dag,
2860  source_exe_unit.hash_table_build_plan_dag,
2861  source_exe_unit.table_id_to_node_map,
2862  source_exe_unit.use_bump_allocator,
2863  source_exe_unit.union_all,
2864  source_exe_unit.query_state},
2865  source,
2866  max_groups_buffer_entry_guess,
2867  std::move(source_work_unit.query_rewriter),
2868  source_work_unit.input_permutation,
2869  source_work_unit.left_deep_join_input_sizes};
2870 }
size_t getOffset() const
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:92
static SpeculativeTopNBlacklist speculative_topn_blacklist_
std::vector< InputDescriptor > input_descs
#define CHECK_GT(x, y)
Definition: Logger.h:223
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
SortAlgorithm
const RelAlgNode * getInput(const size_t idx) const
size_t collationCount() const
size_t getLimit() const
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:105
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::TableFunctionWorkUnit RelAlgExecutor::createTableFunctionWorkUnit ( const RelTableFunction table_func,
const bool  just_explain,
const bool  is_gpu 
)
private

Definition at line 4320 of file RelAlgExecutor.cpp.

References bind_table_function(), cat_, CHECK, CHECK_EQ, CHECK_GT, CHECK_LT, RelTableFunction::countRexLiteralArgs(), DEFAULT_ROW_MULTIPLIER_VALUE, executor_, ext_arg_type_to_type_info(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelTableFunction::getColInputsSize(), RelTableFunction::getFunctionName(), RelTableFunction::getTableFuncInputAt(), i, kINT, LOG, now_, query_state_, RelAlgNode::setOutputMetainfo(), TableFunctions, TableFunctionExecutionUnit::target_exprs, target_exprs_owned_, to_string(), anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), UNREACHABLE, and logger::WARNING.

Referenced by executeTableFunction().

4323  {
4324  std::vector<InputDescriptor> input_descs;
4325  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4326  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
4327  std::tie(input_descs, input_col_descs, std::ignore) =
4328  get_input_desc(rel_table_func, input_to_nest_level, {}, cat_);
4329  const auto query_infos = get_table_infos(input_descs, executor_);
4330  RelAlgTranslator translator(
4331  cat_, query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
4332  const auto input_exprs_owned = translate_scalar_sources(
4333  rel_table_func, translator, ::ExecutorType::TableFunctions);
4334  target_exprs_owned_.insert(
4335  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
4336  auto input_exprs = get_raw_pointers(input_exprs_owned);
4337 
4338  const auto table_function_impl_and_type_infos = [=]() {
4339  if (is_gpu) {
4340  try {
4341  return bind_table_function(
4342  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4343  } catch (ExtensionFunctionBindingError& e) {
4344  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
4345  << " Redirecting " << rel_table_func->getFunctionName()
4346  << " step to run on CPU.";
4347  throw QueryMustRunOnCpu();
4348  }
4349  } else {
4350  try {
4351  return bind_table_function(
4352  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4353  } catch (ExtensionFunctionBindingError& e) {
4354  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
4355  throw;
4356  }
4357  }
4358  }();
4359  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
4360  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
4361 
4362  size_t output_row_sizing_param = 0;
4363  if (table_function_impl
4364  .hasUserSpecifiedOutputSizeParameter()) { // constant and row multiplier
4365  const auto parameter_index =
4366  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
4367  CHECK_GT(parameter_index, size_t(0));
4368  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
4369  const auto parameter_expr =
4370  rel_table_func->getTableFuncInputAt(parameter_index - 1);
4371  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
4372  if (!parameter_expr_literal) {
4373  throw std::runtime_error(
4374  "Provided output buffer sizing parameter is not a literal. Only literal "
4375  "values are supported with output buffer sizing configured table "
4376  "functions.");
4377  }
4378  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
4379  if (literal_val < 0) {
4380  throw std::runtime_error("Provided output sizing parameter " +
4381  std::to_string(literal_val) +
4382  " must be positive integer.");
4383  }
4384  output_row_sizing_param = static_cast<size_t>(literal_val);
4385  } else {
4386  // RowMultiplier not specified in the SQL query. Set it to 1
4387  output_row_sizing_param = 1; // default value for RowMultiplier
4388  static Datum d = {DEFAULT_ROW_MULTIPLIER_VALUE};
4389  static auto DEFAULT_ROW_MULTIPLIER_EXPR =
4390  makeExpr<Analyzer::Constant>(kINT, false, d);
4391  // Push the constant 1 to input_exprs
4392  input_exprs.insert(input_exprs.begin() + parameter_index - 1,
4393  DEFAULT_ROW_MULTIPLIER_EXPR.get());
4394  }
4395  } else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
4396  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
4397  } else {
4398  UNREACHABLE();
4399  }
4400 
4401  std::vector<Analyzer::ColumnVar*> input_col_exprs;
4402  size_t input_index = 0;
4403  size_t arg_index = 0;
4404  const auto table_func_args = table_function_impl.getInputArgs();
4405  CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
4406  for (const auto& ti : table_function_type_infos) {
4407  if (ti.is_column_list()) {
4408  for (int i = 0; i < ti.get_dimension(); i++) {
4409  auto& input_expr = input_exprs[input_index];
4410  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
4411  CHECK(col_var);
4412 
4413  // avoid setting type info to ti here since ti doesn't have all the
4414  // properties correctly set
4415  auto type_info = input_expr->get_type_info();
4416  type_info.set_subtype(type_info.get_type()); // set type to be subtype
4417  type_info.set_type(ti.get_type()); // set type to column list
4418  type_info.set_dimension(ti.get_dimension());
4419  input_expr->set_type_info(type_info);
4420 
4421  input_col_exprs.push_back(col_var);
4422  input_index++;
4423  }
4424  } else if (ti.is_column()) {
4425  auto& input_expr = input_exprs[input_index];
4426  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
4427  CHECK(col_var);
4428 
4429  // same here! avoid setting type info to ti since it doesn't have all the
4430  // properties correctly set
4431  auto type_info = input_expr->get_type_info();
4432  type_info.set_subtype(type_info.get_type()); // set type to be subtype
4433  type_info.set_type(ti.get_type()); // set type to column
4434  input_expr->set_type_info(type_info);
4435 
4436  input_col_exprs.push_back(col_var);
4437  input_index++;
4438  } else {
4439  auto input_expr = input_exprs[input_index];
4440  auto ext_func_arg_ti = ext_arg_type_to_type_info(table_func_args[arg_index]);
4441  if (ext_func_arg_ti != input_expr->get_type_info()) {
4442  input_exprs[input_index] = input_expr->add_cast(ext_func_arg_ti).get();
4443  }
4444  input_index++;
4445  }
4446  arg_index++;
4447  }
4448  CHECK_EQ(input_col_exprs.size(), rel_table_func->getColInputsSize());
4449  std::vector<Analyzer::Expr*> table_func_outputs;
4450  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
4451  auto ti = table_function_impl.getOutputSQLType(i);
4452  if (ti.is_dict_encoded_string()) {
4453  auto p = table_function_impl.getInputID(i);
4454 
4455  int32_t input_pos = p.first;
4456  // Iterate over the list of arguments to compute the offset. Use this offset to
4457  // get the corresponding input
4458  int32_t offset = 0;
4459  for (int j = 0; j < input_pos; j++) {
4460  const auto ti = table_function_type_infos[j];
4461  offset += ti.is_column_list() ? ti.get_dimension() : 1;
4462  }
4463  input_pos = offset + p.second;
4464 
4465  CHECK_LT(input_pos, input_exprs.size());
4466  int32_t comp_param = input_exprs_owned[input_pos]->get_type_info().get_comp_param();
4467  ti.set_comp_param(comp_param);
4468  }
4469  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
4470  table_func_outputs.push_back(target_exprs_owned_.back().get());
4471  }
4472  const TableFunctionExecutionUnit exe_unit = {
4473  input_descs,
4474  input_col_descs,
4475  input_exprs, // table function inputs
4476  input_col_exprs, // table function column inputs (duplicates w/ above)
4477  table_func_outputs, // table function projected exprs
4478  output_row_sizing_param, // output buffer sizing param
4479  table_function_impl};
4480  const auto targets_meta = get_targets_meta(rel_table_func, exe_unit.target_exprs);
4481  rel_table_func->setOutputMetainfo(targets_meta);
4482  return {exe_unit, rel_table_func};
4483 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
#define LOG(tag)
Definition: Logger.h:205
#define UNREACHABLE()
Definition: Logger.h:255
#define CHECK_GT(x, y)
Definition: Logger.h:223
std::string to_string(char const *&&v)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
const Catalog_Namespace::Catalog & cat_
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
#define CHECK_LT(x, y)
Definition: Logger.h:221
#define DEFAULT_ROW_MULTIPLIER_VALUE
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
const std::tuple< table_functions::TableFunction, std::vector< SQLTypeInfo > > bind_table_function(std::string name, Analyzer::ExpressionPtrVector input_args, const std::vector< table_functions::TableFunction > &table_funcs, const bool is_gpu)
#define CHECK(condition)
Definition: Logger.h:211
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqltypes.h:45
std::vector< Analyzer::Expr * > target_exprs
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
Executor * executor_
SQLTypeInfo ext_arg_type_to_type_info(const ExtArgumentType ext_arg_type)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createUnionWorkUnit ( const RelLogicalUnion logical_union,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4217 of file RelAlgExecutor.cpp.

References gpu_enabled::accumulate(), shared::append_move(), cat_, CHECK, RegisteredQueryHint::defaults(), EMPTY_QUERY_PLAN, executor_, g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), i, RelLogicalUnion::isAll(), ExecutionOptions::just_explain, now_, shared::printContainer(), query_state_, RelAlgNode::setOutputMetainfo(), anonymous_namespace{RelAlgExecutor.cpp}::target_exprs_for_union(), target_exprs_owned_, RelAlgNode::toString(), and VLOG.

Referenced by executeUnion().

4220  {
4221  std::vector<InputDescriptor> input_descs;
4222  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4223  // Map ra input ptr to index (0, 1).
4224  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
4225  std::tie(input_descs, input_col_descs, std::ignore) =
4226  get_input_desc(logical_union, input_to_nest_level, {}, cat_);
4227  const auto query_infos = get_table_infos(input_descs, executor_);
4228  auto const max_num_tuples =
4229  std::accumulate(query_infos.cbegin(),
4230  query_infos.cend(),
4231  size_t(0),
4232  [](auto max, auto const& query_info) {
4233  return std::max(max, query_info.info.getNumTuples());
4234  });
4235 
4236  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
4237  for (auto& pair : input_to_nest_level) {
4238  VLOG(3) << " (" << pair.first->toString() << ", " << pair.second << ')';
4239  }
4240 
4241  RelAlgTranslator translator(
4242  cat_, query_state_, executor_, input_to_nest_level, {}, now_, eo.just_explain);
4243 
4244  // For UNION queries, we need to keep the target_exprs from both subqueries since they
4245  // may differ on StringDictionaries.
4246  std::vector<Analyzer::Expr*> target_exprs_pair[2];
4247  for (unsigned i = 0; i < 2; ++i) {
4248  auto input_exprs_owned = target_exprs_for_union(logical_union->getInput(i));
4249  CHECK(!input_exprs_owned.empty()) << "No metainfo found for input node(" << i << ") "
4250  << logical_union->getInput(i)->toString();
4251  VLOG(3) << "i(" << i << ") input_exprs_owned.size()=" << input_exprs_owned.size();
4252  for (auto& input_expr : input_exprs_owned) {
4253  VLOG(3) << " " << input_expr->toString();
4254  }
4255  target_exprs_pair[i] = get_raw_pointers(input_exprs_owned);
4256  shared::append_move(target_exprs_owned_, std::move(input_exprs_owned));
4257  }
4258 
4259  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
4260  << " input_col_descs=" << shared::printContainer(input_col_descs)
4261  << " target_exprs.size()=" << target_exprs_pair[0].size()
4262  << " max_num_tuples=" << max_num_tuples;
4263 
4264  const RelAlgExecutionUnit exe_unit = {input_descs,
4265  input_col_descs,
4266  {}, // quals_cf.simple_quals,
4267  {}, // rewrite_quals(quals_cf.quals),
4268  {},
4269  {nullptr},
4270  target_exprs_pair[0],
4271  nullptr,
4272  sort_info,
4273  max_num_tuples,
4276  {},
4277  {},
4278  false,
4279  logical_union->isAll(),
4280  query_state_,
4281  target_exprs_pair[1]};
4282  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4283  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4284 
4285  RelAlgNode const* input0 = logical_union->getInput(0);
4286  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
4287  logical_union->setOutputMetainfo(
4288  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4289  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
4290  logical_union->setOutputMetainfo(
4291  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4292  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
4293  logical_union->setOutputMetainfo(
4294  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4295  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
4296  logical_union->setOutputMetainfo(
4297  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4298  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
4299  logical_union->setOutputMetainfo(
4300  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4301  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
4302  logical_union->setOutputMetainfo(
4303  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4304  } else if (dynamic_cast<const RelSort*>(input0)) {
4305  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
4306  } else {
4307  throw QueryNotSupported("Unsupported input type: " + input0->toString());
4308  }
4309  VLOG(3) << "logical_union->getOutputMetainfo()="
4310  << shared::printContainer(logical_union->getOutputMetainfo())
4311  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
4312  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
4313 
4314  return {rewritten_exe_unit,
4315  logical_union,
4317  std::move(query_rewriter)};
4318 }
bool isAll() const
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
size_t append_move(std::vector< T > &destination, std::vector< T > &&source)
Definition: misc.h:74
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
const Catalog_Namespace::Catalog & cat_
const RelAlgNode * getInput(const size_t idx) const
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_for_union(RelAlgNode const *input_node)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
static RegisteredQueryHint defaults()
Definition: QueryHint.h:222
std::shared_ptr< const query_state::QueryState > query_state_
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:105
virtual std::string toString() const =0
constexpr char const * EMPTY_QUERY_PLAN
#define CHECK(condition)
Definition: Logger.h:211
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:104
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Executor * executor_
#define VLOG(n)
Definition: Logger.h:305

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::unique_ptr< WindowFunctionContext > RelAlgExecutor::createWindowFunctionContext ( const Analyzer::WindowFunction window_func,
const std::shared_ptr< Analyzer::BinOper > &  partition_key_cond,
const RelAlgExecutionUnit ra_exe_unit,
const std::vector< InputTableInfo > &  query_infos,
const CompilationOptions co,
ColumnCacheMap column_cache_map,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner 
)
private

Definition at line 2072 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, Data_Namespace::CPU_LEVEL, CompilationOptions::device_type, executor_, ColumnFetcher::getOneColumnFragment(), Analyzer::WindowFunction::getOrderKeys(), GPU, Data_Namespace::GPU_LEVEL, RelAlgExecutionUnit::hash_table_build_plan_dag, INVALID, OneToMany, RelAlgExecutionUnit::query_hint, and RelAlgExecutionUnit::table_id_to_node_map.

Referenced by computeWindow().

2079  {
2080  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2081  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
2084  std::unique_ptr<WindowFunctionContext> context;
2085  if (partition_key_cond) {
2086  const auto join_table_or_err =
2087  executor_->buildHashTableForQualifier(partition_key_cond,
2088  query_infos,
2089  memory_level,
2090  JoinType::INVALID, // for window function
2092  column_cache_map,
2093  ra_exe_unit.hash_table_build_plan_dag,
2094  ra_exe_unit.query_hint,
2095  ra_exe_unit.table_id_to_node_map);
2096  if (!join_table_or_err.fail_reason.empty()) {
2097  throw std::runtime_error(join_table_or_err.fail_reason);
2098  }
2099  CHECK(join_table_or_err.hash_table->getHashType() == HashType::OneToMany);
2100  context = std::make_unique<WindowFunctionContext>(window_func,
2101  join_table_or_err.hash_table,
2102  elem_count,
2103  co.device_type,
2104  row_set_mem_owner);
2105  } else {
2106  context = std::make_unique<WindowFunctionContext>(
2107  window_func, elem_count, co.device_type, row_set_mem_owner);
2108  }
2109  const auto& order_keys = window_func->getOrderKeys();
2110  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2111  for (const auto& order_key : order_keys) {
2112  const auto order_col =
2113  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
2114  if (!order_col) {
2115  throw std::runtime_error("Only order by columns supported for now");
2116  }
2117  const int8_t* column;
2118  size_t join_col_elem_count;
2119  std::tie(column, join_col_elem_count) =
2121  *order_col,
2122  query_infos.front().info.fragments.front(),
2123  memory_level,
2124  0,
2125  nullptr,
2126  /*thread_idx=*/0,
2127  chunks_owner,
2128  column_cache_map);
2129 
2130  CHECK_EQ(join_col_elem_count, elem_count);
2131  context->addOrderColumn(column, order_col.get(), chunks_owner);
2132  }
2133  return context;
2134 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1615
TableIdToNodeMap table_id_to_node_map
ExecutorDeviceType device_type
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
RegisteredQueryHint query_hint
#define CHECK(condition)
Definition: Logger.h:211
Executor * executor_
HashTableBuildDagMap hash_table_build_plan_dag

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createWorkUnit ( const RelAlgNode node,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 3527 of file RelAlgExecutor.cpp.

References createAggregateWorkUnit(), createCompoundWorkUnit(), createFilterWorkUnit(), createProjectWorkUnit(), logger::FATAL, ExecutionOptions::just_explain, LOG, and RelAlgNode::toString().

Referenced by createSortInputWorkUnit(), and getJoinInfo().

3529  {
3530  const auto compound = dynamic_cast<const RelCompound*>(node);
3531  if (compound) {
3532  return createCompoundWorkUnit(compound, sort_info, eo);
3533  }
3534  const auto project = dynamic_cast<const RelProject*>(node);
3535  if (project) {
3536  return createProjectWorkUnit(project, sort_info, eo);
3537  }
3538  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
3539  if (aggregate) {
3540  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
3541  }
3542  const auto filter = dynamic_cast<const RelFilter*>(node);
3543  if (filter) {
3544  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
3545  }
3546  LOG(FATAL) << "Unhandled node type: " << node->toString();
3547  return {};
3548 }
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
#define LOG(tag)
Definition: Logger.h:205
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
virtual std::string toString() const =0
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::eraseFromTemporaryTables ( const int  table_id)
inlineprivate

Definition at line 363 of file RelAlgExecutor.h.

References temporary_tables_.

363 { temporary_tables_.erase(table_id); }
TemporaryTables temporary_tables_
ExecutionResult RelAlgExecutor::executeAggregate ( const RelAggregate aggregate,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 1882 of file RelAlgExecutor.cpp.

References createAggregateWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

1886  {
1887  auto timer = DEBUG_TIMER(__func__);
1888  const auto work_unit = createAggregateWorkUnit(
1889  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1890  return executeWorkUnit(work_unit,
1891  aggregate->getOutputMetainfo(),
1892  true,
1893  co,
1894  eo,
1895  render_info,
1896  queue_time_ms);
1897 }
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define DEBUG_TIMER(name)
Definition: Logger.h:358
const std::vector< TargetMetaInfo > & getOutputMetainfo() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeCompound ( const RelCompound compound,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 1864 of file RelAlgExecutor.cpp.

References createCompoundWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and RelCompound::isAggregate().

Referenced by executeRelAlgStep().

1868  {
1869  auto timer = DEBUG_TIMER(__func__);
1870  const auto work_unit =
1871  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
1872  CompilationOptions co_compound = co;
1873  return executeWorkUnit(work_unit,
1874  compound->getOutputMetainfo(),
1875  compound->isAggregate(),
1876  co_compound,
1877  eo,
1878  render_info,
1879  queue_time_ms);
1880 }
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
bool isAggregate() const
#define DEBUG_TIMER(name)
Definition: Logger.h:358
const std::vector< TargetMetaInfo > & getOutputMetainfo() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeDelete ( const RelAlgNode node,
const CompilationOptions co,
const ExecutionOptions eo_in,
const int64_t  queue_time_ms 
)
private

Definition at line 1763 of file RelAlgExecutor.cpp.

References cat_, CHECK, CHECK_EQ, createCompoundWorkUnit(), createProjectWorkUnit(), DEBUG_TIMER, Default, dml_transaction_parameters_, executor_, CompilationOptions::filter_on_deleted_column, get_table_infos(), get_temporary_table(), Catalog_Namespace::Catalog::getDeletedColumn(), QueryExecutionError::getErrorCode(), getErrorMessageFromCode(), CacheInvalidator< CACHE_HOLDING_TYPES >::invalidateCaches(), CompilationOptions::makeCpuOnly(), ExecutionOptions::output_columnar_hint, post_execution_callback_, table_is_temporary(), temporary_tables_, and StorageIOFacility::yieldDeleteCallback().

Referenced by executeRelAlgStep().

1766  {
1767  CHECK(node);
1768  auto timer = DEBUG_TIMER(__func__);
1769 
1771 
1772  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
1773  auto& work_unit,
1774  const bool is_aggregate) {
1775  auto* table_descriptor = node->getModifiedTableDescriptor();
1776  CHECK(table_descriptor);
1777  if (!table_descriptor->hasDeletedCol) {
1778  throw std::runtime_error(
1779  "DELETE queries are only supported on tables with the vacuum attribute set to "
1780  "'delayed'");
1781  }
1782 
1783  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1784 
1785  auto execute_delete_ra_exe_unit =
1786  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
1787  const bool is_aggregate) {
1789  std::make_unique<DeleteTransactionParameters>(table_descriptor);
1790  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
1792  CHECK(delete_params);
1793  auto delete_callback = yieldDeleteCallback(*delete_params);
1795 
1796  auto eo = eo_in;
1797  if (dml_transaction_parameters_->tableIsTemporary()) {
1798  eo.output_columnar_hint = true;
1799  co_delete.filter_on_deleted_column =
1800  false; // project the entire delete column for columnar update
1801  } else {
1802  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
1803  }
1804 
1805  try {
1806  auto table_update_metadata =
1807  executor_->executeUpdate(exe_unit,
1808  table_infos,
1809  table_descriptor,
1810  co_delete,
1811  eo,
1812  cat_,
1813  executor_->row_set_mem_owner_,
1814  delete_callback,
1815  is_aggregate);
1816  post_execution_callback_ = [table_update_metadata, this]() {
1817  dml_transaction_parameters_->finalizeTransaction(cat_);
1818  TableOptimizer table_optimizer{
1819  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
1820  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
1821  };
1822  } catch (const QueryExecutionError& e) {
1823  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
1824  }
1825  };
1826 
1827  if (table_is_temporary(table_descriptor)) {
1828  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1829  auto cd = cat_.getDeletedColumn(table_descriptor);
1830  CHECK(cd);
1831  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
1832  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
1833  const auto rewritten_exe_unit =
1834  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
1835  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1836  } else {
1837  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1838  }
1839  };
1840 
1841  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1842  const auto work_unit =
1843  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1844  execute_delete_for_node(compound, work_unit, compound->isAggregate());
1845  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1846  auto work_unit =
1847  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1848  if (project->isSimple()) {
1849  CHECK_EQ(size_t(1), project->inputCount());
1850  const auto input_ra = project->getInput(0);
1851  if (dynamic_cast<const RelSort*>(input_ra)) {
1852  const auto& input_table =
1853  get_temporary_table(&temporary_tables_, -input_ra->getId());
1854  CHECK(input_table);
1855  work_unit.exe_unit.scan_limit = input_table->rowCount();
1856  }
1857  }
1858  execute_delete_for_node(project, work_unit, false);
1859  } else {
1860  throw std::runtime_error("Unsupported parent node for delete: " + node->toString());
1861  }
1862 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::optional< std::function< void()> > post_execution_callback_
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:3183
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
TemporaryTables temporary_tables_
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
static void invalidateCaches()
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:228
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
const Catalog_Namespace::Catalog & cat_
bool table_is_temporary(const TableDescriptor *const td)
#define CHECK(condition)
Definition: Logger.h:211
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:358
static std::string getErrorMessageFromCode(const int32_t error_code)
std::unique_ptr< TransactionParameters > dml_transaction_parameters_
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeFilter ( const RelFilter filter,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2136 of file RelAlgExecutor.cpp.

References createFilterWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

2140  {
2141  auto timer = DEBUG_TIMER(__func__);
2142  const auto work_unit =
2143  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2144  return executeWorkUnit(
2145  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
2146 }
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define DEBUG_TIMER(name)
Definition: Logger.h:358
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeLogicalValues ( const RelLogicalValues logical_values,
const ExecutionOptions eo 
)
private

Definition at line 2192 of file RelAlgExecutor.cpp.

References CPU, DEBUG_TIMER, executor_, RelLogicalValues::getNumRows(), RelLogicalValues::getTupleType(), i, kBIGINT, kCOUNT, kNULLT, Projection, query_mem_desc, and RelAlgNode::setOutputMetainfo().

Referenced by executeRelAlgStep().

2194  {
2195  auto timer = DEBUG_TIMER(__func__);
2197  logical_values->getNumRows(),
2199  /*is_table_function=*/false);
2200 
2201  auto tuple_type = logical_values->getTupleType();
2202  for (size_t i = 0; i < tuple_type.size(); ++i) {
2203  auto& target_meta_info = tuple_type[i];
2204  if (target_meta_info.get_type_info().is_varlen()) {
2205  throw std::runtime_error("Variable length types not supported in VALUES yet.");
2206  }
2207  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2208  // replace w/ bigint
2209  tuple_type[i] =
2210  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2211  }
2212  query_mem_desc.addColSlotInfo(
2213  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2214  }
2215  logical_values->setOutputMetainfo(tuple_type);
2216 
2217  std::vector<TargetInfo> target_infos;
2218  for (const auto& tuple_type_component : tuple_type) {
2219  target_infos.emplace_back(TargetInfo{false,
2220  kCOUNT,
2221  tuple_type_component.get_type_info(),
2222  SQLTypeInfo(kNULLT, false),
2223  false,
2224  false,
2225  /*is_varlen_projection=*/false});
2226  }
2227 
2228  std::shared_ptr<ResultSet> rs{
2229  ResultSetLogicalValuesBuilder{logical_values,
2230  target_infos,
2233  executor_->getRowSetMemoryOwner(),
2234  executor_}
2235  .build()};
2236 
2237  return {rs, tuple_type};
2238 }
size_t getNumRows() const
const std::vector< TargetMetaInfo > getTupleType() const
Definition: sqldefs.h:76
#define DEBUG_TIMER(name)
Definition: Logger.h:358
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeModify ( const RelModify modify,
const ExecutionOptions eo 
)
private

Definition at line 2288 of file RelAlgExecutor.cpp.

References CPU, DEBUG_TIMER, executor_, and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

2289  {
2290  auto timer = DEBUG_TIMER(__func__);
2291  if (eo.just_explain) {
2292  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2293  }
2294 
2295  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2298  executor_->getRowSetMemoryOwner(),
2299  executor_->getCatalog(),
2300  executor_->blockSize(),
2301  executor_->gridSize());
2302 
2303  std::vector<TargetMetaInfo> empty_targets;
2304  return {rs, empty_targets};
2305 }
std::vector< TargetInfo > TargetInfoList
#define DEBUG_TIMER(name)
Definition: Logger.h:358
Executor * executor_

+ Here is the caller graph for this function:

void RelAlgExecutor::executePostExecutionCallback ( )

Definition at line 3520 of file RelAlgExecutor.cpp.

References post_execution_callback_, and VLOG.

3520  {
3522  VLOG(1) << "Running post execution callback.";
3523  (*post_execution_callback_)();
3524  }
3525 }
std::optional< std::function< void()> > post_execution_callback_
#define VLOG(n)
Definition: Logger.h:305
ExecutionResult RelAlgExecutor::executeProject ( const RelProject project,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const std::optional< size_t >  previous_count 
)
private

Definition at line 1912 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, CPU, createProjectWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), get_temporary_table(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelAlgNode::inputCount(), RelProject::isSimple(), and temporary_tables_.

Referenced by executeRelAlgStep().

1918  {
1919  auto timer = DEBUG_TIMER(__func__);
1920  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
1921  CompilationOptions co_project = co;
1922  if (project->isSimple()) {
1923  CHECK_EQ(size_t(1), project->inputCount());
1924  const auto input_ra = project->getInput(0);
1925  if (dynamic_cast<const RelSort*>(input_ra)) {
1926  co_project.device_type = ExecutorDeviceType::CPU;
1927  const auto& input_table =
1928  get_temporary_table(&temporary_tables_, -input_ra->getId());
1929  CHECK(input_table);
1930  work_unit.exe_unit.scan_limit =
1931  std::min(input_table->getLimit(), input_table->rowCount());
1932  }
1933  }
1934  return executeWorkUnit(work_unit,
1935  project->getOutputMetainfo(),
1936  false,
1937  co_project,
1938  eo,
1939  render_info,
1940  queue_time_ms,
1941  previous_count);
1942 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
TemporaryTables temporary_tables_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:228
const RelAlgNode * getInput(const size_t idx) const
bool isSimple() const
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
const size_t inputCount() const
const std::vector< TargetMetaInfo > & getOutputMetainfo() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQuery ( const CompilationOptions co,
const ExecutionOptions eo,
const bool  just_explain_plan,
RenderInfo render_info 
)

Definition at line 308 of file RelAlgExecutor.cpp.

References CHECK, DEBUG_TIMER, executeRelAlgQueryNoRetry(), g_allow_cpu_retry, logger::INFO, INJECT_TIMER, LOG, CompilationOptions::makeCpuOnly(), post_execution_callback_, query_dag_, run_benchmark::run_query(), RenderInfo::setForceNonInSituData(), and VLOG.

311  {
312  CHECK(query_dag_);
313  auto timer = DEBUG_TIMER(__func__);
315 
316  auto run_query = [&](const CompilationOptions& co_in) {
317  auto execution_result =
318  executeRelAlgQueryNoRetry(co_in, eo, just_explain_plan, render_info);
320  VLOG(1) << "Running post execution callback.";
321  (*post_execution_callback_)();
322  }
323  return execution_result;
324  };
325 
326  try {
327  return run_query(co);
328  } catch (const QueryMustRunOnCpu&) {
329  if (!g_allow_cpu_retry) {
330  throw;
331  }
332  }
333  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
334  auto co_cpu = CompilationOptions::makeCpuOnly(co);
335 
336  if (render_info) {
337  render_info->setForceNonInSituData();
338  }
339  return run_query(co_cpu);
340 }
std::optional< std::function< void()> > post_execution_callback_
void setForceNonInSituData()
Definition: RenderInfo.cpp:44
#define LOG(tag)
Definition: Logger.h:205
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
ExecutionResult executeRelAlgQuery(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
std::unique_ptr< RelAlgDagBuilder > query_dag_
bool g_allow_cpu_retry
Definition: Execute.cpp:82
#define VLOG(n)
Definition: Logger.h:305

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQueryNoRetry ( const CompilationOptions co,
const ExecutionOptions eo,
const bool  just_explain_plan,
RenderInfo render_info 
)
private

Definition at line 342 of file RelAlgExecutor.cpp.

References ExecutionOptions::allow_runtime_query_interrupt, cat_, CHECK, cleanupPostExecution(), DEBUG_TIMER, RenderInfo::disallow_in_situ_only_if_final_ED_is_aggregate, Executor::ERR_INTERRUPTED, executeRelAlgQueryWithFilterPushDown(), executeRelAlgSeq(), executor_, ExecutionOptions::find_push_down_candidates, g_enable_dynamic_watchdog, get_physical_inputs(), get_physical_table_inputs(), QueryExecutionError::getErrorCode(), getSubqueries(), i, INJECT_TIMER, join(), ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, anonymous_namespace{RelAlgExecutor.cpp}::prepare_for_system_table_execution(), anonymous_namespace{RelAlgExecutor.cpp}::prepare_foreign_table_for_execution(), query_dag_, query_state_, run_benchmark_import::result, gpu_enabled::reverse(), RenderInfo::setInSituDataIfUnset(), gpu_enabled::sort(), timer_start(), timer_stop(), and to_string().

Referenced by executeRelAlgQuery().

345  {
347  auto timer = DEBUG_TIMER(__func__);
348  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
349 
350  query_dag_->resetQueryExecutionState();
351  const auto& ra = query_dag_->getRootNode();
352 
353  // capture the lock acquistion time
354  auto clock_begin = timer_start();
356  executor_->resetInterrupt();
357  }
358  std::string query_session{""};
359  std::string query_str{"N/A"};
360  std::string query_submitted_time{""};
361  // gather necessary query's info
362  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
363  query_session = query_state_->getConstSessionInfo()->get_session_id();
364  query_str = query_state_->getQueryStr();
365  query_submitted_time = query_state_->getQuerySubmittedTime();
366  }
367 
368  auto validate_or_explain_query =
369  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
370  auto interruptable = !render_info && !query_session.empty() &&
371  eo.allow_runtime_query_interrupt && !validate_or_explain_query;
372  if (interruptable) {
373  // if we reach here, the current query which was waiting an idle executor
374  // within the dispatch queue is now scheduled to the specific executor
375  // (not UNITARY_EXECUTOR)
376  // so we update the query session's status with the executor that takes this query
377  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
378  query_session, query_str, query_submitted_time);
379 
380  // now the query is going to be executed, so update the status as
381  // "RUNNING_QUERY_KERNEL"
382  executor_->updateQuerySessionStatus(
383  query_session,
384  query_submitted_time,
385  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
386  }
387 
388  // so it should do cleanup session info after finishing its execution
389  ScopeGuard clearQuerySessionInfo =
390  [this, &query_session, &interruptable, &query_submitted_time] {
391  // reset the runtime query interrupt status after the end of query execution
392  if (interruptable) {
393  // cleanup running session's info
394  executor_->clearQuerySessionStatus(query_session, query_submitted_time);
395  }
396  };
397 
398  auto acquire_execute_mutex = [](Executor * executor) -> auto {
399  auto ret = executor->acquireExecuteMutex();
400  return ret;
401  };
402  // now we acquire executor lock in here to make sure that this executor holds
403  // all necessary resources and at the same time protect them against other executor
404  auto lock = acquire_execute_mutex(executor_);
405 
406  if (interruptable) {
407  // check whether this query session is "already" interrupted
408  // this case occurs when there is very short gap between being interrupted and
409  // taking the execute lock
410  // if so we have to remove "all" queries initiated by this session and we do in here
411  // without running the query
412  try {
413  executor_->checkPendingQueryStatus(query_session);
414  } catch (QueryExecutionError& e) {
416  throw std::runtime_error("Query execution has been interrupted (pending query)");
417  }
418  throw e;
419  } catch (...) {
420  throw std::runtime_error("Checking pending query status failed: unknown error");
421  }
422  }
423  int64_t queue_time_ms = timer_stop(clock_begin);
424 
426 
427  // Notify foreign tables to load prior to caching
429 
430  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
431  const auto phys_inputs = get_physical_inputs(cat_, &ra);
432  const auto phys_table_ids = get_physical_table_inputs(&ra);
433  executor_->setCatalog(&cat_);
434  executor_->setupCaching(phys_inputs, phys_table_ids);
435 
436  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
437  auto ed_seq = RaExecutionSequence(&ra);
438 
439  if (just_explain_plan) {
440  std::stringstream ss;
441  std::vector<const RelAlgNode*> nodes;
442  for (size_t i = 0; i < ed_seq.size(); i++) {
443  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
444  }
445  size_t ctr = nodes.size();
446  size_t tab_ctr = 0;
447  for (auto& body : boost::adaptors::reverse(nodes)) {
448  const auto index = ctr--;
449  const auto tabs = std::string(tab_ctr++, '\t');
450  CHECK(body);
451  ss << tabs << std::to_string(index) << " : " << body->toString() << "\n";
452  if (auto sort = dynamic_cast<const RelSort*>(body)) {
453  ss << tabs << " : " << sort->getInput(0)->toString() << "\n";
454  }
455  if (dynamic_cast<const RelProject*>(body) ||
456  dynamic_cast<const RelCompound*>(body)) {
457  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
458  ss << tabs << " : " << join->toString() << "\n";
459  }
460  }
461  }
462  const auto& subqueries = getSubqueries();
463  if (!subqueries.empty()) {
464  ss << "Subqueries: "
465  << "\n";
466  for (const auto& subquery : subqueries) {
467  const auto ra = subquery->getRelAlg();
468  ss << "\t" << ra->toString() << "\n";
469  }
470  }
471  auto rs = std::make_shared<ResultSet>(ss.str());
472  return {rs, {}};
473  }
474 
475  if (render_info) {
476  // set render to be non-insitu in certain situations.
478  ed_seq.size() > 1) {
479  // old logic
480  // disallow if more than one ED
481  render_info->setInSituDataIfUnset(false);
482  }
483  }
484 
485  if (eo.find_push_down_candidates) {
486  // this extra logic is mainly due to current limitations on multi-step queries
487  // and/or subqueries.
489  ed_seq, co, eo, render_info, queue_time_ms);
490  }
491  timer_setup.stop();
492 
493  // Dispatch the subqueries first
494  for (auto subquery : getSubqueries()) {
495  const auto subquery_ra = subquery->getRelAlg();
496  CHECK(subquery_ra);
497  if (subquery_ra->hasContextData()) {
498  continue;
499  }
500  // Execute the subquery and cache the result.
501  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
502  RaExecutionSequence subquery_seq(subquery_ra);
503  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
504  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
505  }
506  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
507 }
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
void prepare_for_system_table_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog, const CompilationOptions &co)
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1165
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
bool setInSituDataIfUnset(const bool is_in_situ_data)
Definition: RenderInfo.cpp:97
std::string join(T const &container, std::string const &delim)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
std::string to_string(char const *&&v)
bool disallow_in_situ_only_if_final_ED_is_aggregate
Definition: RenderInfo.h:40
#define INJECT_TIMER(DESC)
Definition: measure.h:93
A container for relational algebra descriptors defining the execution order for a relational algebra ...
const Catalog_Namespace::Catalog & cat_
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
std::shared_ptr< const query_state::QueryState > query_state_
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
std::unique_ptr< RelAlgDagBuilder > query_dag_
void cleanupPostExecution()
std::unordered_set< PhysicalInput > get_physical_inputs(const RelAlgNode *ra)
DEVICE void reverse(ARGS &&...args)
Definition: gpu_enabled.h:96
Executor * executor_
std::unordered_set< int > get_physical_table_inputs(const RelAlgNode *ra)
Type timer_start()
Definition: measure.h:42

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

QueryStepExecutionResult RelAlgExecutor::executeRelAlgQuerySingleStep ( const RaExecutionSequence seq,
const size_t  step_idx,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info 
)

Definition at line 559 of file RelAlgExecutor.cpp.

References ExecutionOptions::allow_loop_joins, ExecutionOptions::allow_multifrag, ExecutionOptions::allow_runtime_query_interrupt, CHECK, CHECK_EQ, anonymous_namespace{RelAlgExecutor.cpp}::check_sort_node_source_constraint(), createSortInputWorkUnit(), ExecutionOptions::dynamic_watchdog_time_limit, executeRelAlgSubSeq(), executor_, ExecutionOptions::executor_type, ExecutionOptions::find_push_down_candidates, RaExecutionSequence::getDescriptor(), ExecutionOptions::gpu_input_mem_limit_percent, INJECT_TIMER, ExecutionOptions::jit_debug, ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, anonymous_namespace{RelAlgExecutor.cpp}::node_is_aggregate(), ExecutionOptions::output_columnar_hint, ExecutionOptions::pending_query_interrupt_freq, post_execution_callback_, queue_time_ms_, Reduce, run_benchmark_import::result, ExecutionOptions::running_query_interrupt_freq, GroupByAndAggregate::shard_count_for_top_groups(), gpu_enabled::sort(), Union, VLOG, ExecutionOptions::with_dynamic_watchdog, and ExecutionOptions::with_watchdog.

564  {
565  INJECT_TIMER(executeRelAlgQueryStep);
566 
567  auto exe_desc_ptr = seq.getDescriptor(step_idx);
568  CHECK(exe_desc_ptr);
569  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
570 
571  size_t shard_count{0};
572  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
573  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
574  };
575 
576  if (sort) {
578  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
580  source_work_unit.exe_unit, *executor_->getCatalog());
581  if (!shard_count) {
582  // No point in sorting on the leaf, only execute the input to the sort node.
583  CHECK_EQ(size_t(1), sort->inputCount());
584  const auto source = sort->getInput(0);
585  if (sort->collationCount() || node_is_aggregate(source)) {
586  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
587  CHECK_EQ(temp_seq.size(), size_t(1));
588  ExecutionOptions eo_copy = {
590  eo.allow_multifrag,
591  eo.just_explain,
592  eo.allow_loop_joins,
593  eo.with_watchdog,
594  eo.jit_debug,
595  eo.just_validate || sort->isEmptyResult(),
604  eo.executor_type,
605  };
606  // Use subseq to avoid clearing existing temporary tables
607  return {
608  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
609  merge_type(source),
610  source->getId(),
611  false};
612  }
613  }
614  }
617  std::make_pair(step_idx, step_idx + 1),
618  co,
619  eo,
620  render_info,
622  merge_type(exe_desc_ptr->getBody()),
623  exe_desc_ptr->getBody()->getId(),
624  false};
626  VLOG(1) << "Running post execution callback.";
627  (*post_execution_callback_)();
628  }
629  return result;
630 }
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::optional< std::function< void()> > post_execution_callback_
RaExecutionDesc * getDescriptor(size_t idx) const
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
void check_sort_node_source_constraint(const RelSort *sort)
double running_query_interrupt_freq
ExecutorType executor_type
#define INJECT_TIMER(DESC)
Definition: measure.h:93
MergeType
A container for relational algebra descriptors defining the execution order for a relational algebra ...
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
unsigned pending_query_interrupt_freq
bool node_is_aggregate(const RelAlgNode *ra)
#define CHECK(condition)
Definition: Logger.h:211
double gpu_input_mem_limit_percent
unsigned dynamic_watchdog_time_limit
Executor * executor_
#define VLOG(n)
Definition: Logger.h:305
WorkUnit createSortInputWorkUnit(const RelSort *, const ExecutionOptions &eo)
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQueryWithFilterPushDown ( const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)

Definition at line 145 of file JoinFilterPushDown.cpp.

References ExecutionOptions::allow_loop_joins, ExecutionOptions::allow_multifrag, ExecutionOptions::allow_runtime_query_interrupt, cat_, CHECK, ExecutionOptions::dynamic_watchdog_time_limit, executeRelAlgSeq(), executor_, ExecutionOptions::find_push_down_candidates, getSubqueries(), ExecutionOptions::gpu_input_mem_limit_percent, ExecutionOptions::jit_debug, ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, ExecutionOptions::output_columnar_hint, ExecutionOptions::pending_query_interrupt_freq, run_benchmark_import::result, ExecutionOptions::running_query_interrupt_freq, RaExecutionSequence::size(), ExecutionOptions::with_dynamic_watchdog, and ExecutionOptions::with_watchdog.

Referenced by executeRelAlgQueryNoRetry().

150  {
151  // we currently do not fully support filter push down with
152  // multi-step execution and/or with subqueries
153  // TODO(Saman): add proper caching to enable filter push down for all cases
154  const auto& subqueries = getSubqueries();
155  if (seq.size() > 1 || !subqueries.empty()) {
156  if (eo.just_calcite_explain) {
157  return ExecutionResult(std::vector<PushedDownFilterInfo>{},
159  }
160  const ExecutionOptions eo_modified{eo.output_columnar_hint,
161  eo.allow_multifrag,
162  eo.just_explain,
163  eo.allow_loop_joins,
164  eo.with_watchdog,
165  eo.jit_debug,
166  eo.just_validate,
169  /*find_push_down_candidates=*/false,
170  /*just_calcite_explain=*/false,
175 
176  // Dispatch the subqueries first
177  for (auto subquery : subqueries) {
178  // Execute the subquery and cache the result.
179  RelAlgExecutor ra_executor(executor_, cat_);
180  const auto subquery_ra = subquery->getRelAlg();
181  CHECK(subquery_ra);
182  RaExecutionSequence subquery_seq(subquery_ra);
183  auto result =
184  ra_executor.executeRelAlgSeq(subquery_seq, co, eo_modified, nullptr, 0);
185  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
186  }
187  return executeRelAlgSeq(seq, co, eo_modified, render_info, queue_time_ms);
188  }
189  // else
190  return executeRelAlgSeq(seq, co, eo, render_info, queue_time_ms);
191 }
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
double running_query_interrupt_freq
A container for relational algebra descriptors defining the execution order for a relational algebra ...
const Catalog_Namespace::Catalog & cat_
unsigned pending_query_interrupt_freq
#define CHECK(condition)
Definition: Logger.h:211
double gpu_input_mem_limit_percent
unsigned dynamic_watchdog_time_limit
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgSeq ( const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const bool  with_existing_temp_tables = false 
)

Definition at line 649 of file RelAlgExecutor.cpp.

References cat_, CHECK, CHECK_GE, DEBUG_TIMER, CompilationOptions::device_type, RaExecutionSequence::empty(), executeRelAlgStep(), executor_, ExecutionOptions::executor_type, g_allow_query_step_cpu_retry, g_cluster, g_enable_interop, RaExecutionDesc::getBody(), RaExecutionSequence::getDescriptor(), GPU, i, logger::INFO, INJECT_TIMER, ExecutionOptions::just_explain, left_deep_join_info_, LOG, CompilationOptions::makeCpuOnly(), now_, RaExecutionSequence::size(), gpu_enabled::swap(), target_exprs_owned_, temporary_tables_, and VLOG.

Referenced by executeRelAlgQueryNoRetry(), and executeRelAlgQueryWithFilterPushDown().

654  {
656  auto timer = DEBUG_TIMER(__func__);
657  if (!with_existing_temp_tables) {
659  }
662  executor_->setCatalog(&cat_);
663  executor_->temporary_tables_ = &temporary_tables_;
664 
665  time(&now_);
666  CHECK(!seq.empty());
667 
668  auto get_descriptor_count = [&seq, &eo]() -> size_t {
669  if (eo.just_explain) {
670  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
671  // run the logical values descriptor to generate the result set, then the next
672  // descriptor to generate the explain
673  CHECK_GE(seq.size(), size_t(2));
674  return 2;
675  } else {
676  return 1;
677  }
678  } else {
679  return seq.size();
680  }
681  };
682 
683  const auto exec_desc_count = get_descriptor_count();
684  // this join info needs to be maintained throughout an entire query runtime
685  for (size_t i = 0; i < exec_desc_count; i++) {
686  VLOG(1) << "Executing query step " << i;
687  // only render on the last step
688  try {
689  executeRelAlgStep(seq,
690  i,
691  co,
692  eo,
693  (i == exec_desc_count - 1) ? render_info : nullptr,
694  queue_time_ms);
695  } catch (const QueryMustRunOnCpu&) {
696  // Do not allow per-step retry if flag is off or in distributed mode
697  // TODO(todd): Determine if and when we can relax this restriction
698  // for distributed
701  throw;
702  }
703  LOG(INFO) << "Retrying current query step " << i << " on CPU";
704  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
705  executeRelAlgStep(seq,
706  i,
707  co_cpu,
708  eo,
709  (i == exec_desc_count - 1) ? render_info : nullptr,
710  queue_time_ms);
711  } catch (const NativeExecutionError&) {
712  if (!g_enable_interop) {
713  throw;
714  }
715  auto eo_extern = eo;
716  eo_extern.executor_type = ::ExecutorType::Extern;
717  auto exec_desc_ptr = seq.getDescriptor(i);
718  const auto body = exec_desc_ptr->getBody();
719  const auto compound = dynamic_cast<const RelCompound*>(body);
720  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
721  LOG(INFO) << "Also failed to run the query using interoperability";
722  throw;
723  }
724  executeRelAlgStep(seq,
725  i,
726  co,
727  eo_extern,
728  (i == exec_desc_count - 1) ? render_info : nullptr,
729  queue_time_ms);
730  }
731  }
732 
733  return seq.getDescriptor(exec_desc_count - 1)->getResult();
734 }
RaExecutionDesc * getDescriptor(size_t idx) const
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
#define LOG(tag)
Definition: Logger.h:205
TemporaryTables temporary_tables_
#define CHECK_GE(x, y)
Definition: Logger.h:224
bool g_enable_interop
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
ExecutorType executor_type
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const Catalog_Namespace::Catalog & cat_
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutorDeviceType device_type
std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_join_info_
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
const RelAlgNode * getBody() const
bool g_allow_query_step_cpu_retry
Definition: Execute.cpp:83
bool g_cluster
Executor * executor_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define VLOG(n)
Definition: Logger.h:305

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeRelAlgStep ( const RaExecutionSequence seq,
const size_t  step_idx,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 779 of file RelAlgExecutor.cpp.

References addTemporaryTable(), ExecutionOptions::allow_loop_joins, ExecutionOptions::allow_multifrag, ExecutionOptions::allow_runtime_query_interrupt, cat_, CHECK, CPU, DEBUG_TIMER, CompilationOptions::device_type, ExecutionOptions::dynamic_watchdog_time_limit, executeAggregate(), executeCompound(), executeDelete(), executeFilter(), executeLogicalValues(), executeModify(), executeProject(), executeSort(), executeTableFunction(), executeUnion(), executeUpdate(), executor_, ExecutionOptions::executor_type, logger::FATAL, ExecutionOptions::find_push_down_candidates, g_cluster, g_skip_intermediate_count, RelAlgNode::getContextData(), RaExecutionSequence::getDescriptor(), RaExecutionSequence::getDescriptorByBodyId(), RelAlgNode::getId(), RelAlgNode::getInput(), getParsedQueryHint(), ExecutionOptions::gpu_input_mem_limit_percent, handleNop(), RelAlgNode::hasContextData(), logger::INFO, INJECT_TIMER, ExecutionOptions::jit_debug, ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, kColumnarOutput, kCpuMode, kRowwiseOutput, LOG, ExecutionOptions::outer_fragment_indices, ExecutionOptions::output_columnar_hint, ExecutionOptions::pending_query_interrupt_freq, anonymous_namespace{RelAlgExecutor.cpp}::prepare_foreign_table_for_execution(), WindowProjectNodeContext::reset(), ExecutionOptions::running_query_interrupt_freq, gpu_enabled::sort(), VLOG, ExecutionOptions::with_dynamic_watchdog, and ExecutionOptions::with_watchdog.

Referenced by executeRelAlgSeq(), and executeRelAlgSubSeq().

784  {
786  auto timer = DEBUG_TIMER(__func__);
788  auto exec_desc_ptr = seq.getDescriptor(step_idx);
789  CHECK(exec_desc_ptr);
790  auto& exec_desc = *exec_desc_ptr;
791  const auto body = exec_desc.getBody();
792  if (body->isNop()) {
793  handleNop(exec_desc);
794  return;
795  }
796 
797  const ExecutionOptions eo_work_unit{
799  eo.allow_multifrag,
800  eo.just_explain,
801  eo.allow_loop_joins,
802  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
803  eo.jit_debug,
804  eo.just_validate,
813  eo.executor_type,
814  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
815 
816  auto handle_hint = [co,
817  eo_work_unit,
818  body,
819  this]() -> std::pair<CompilationOptions, ExecutionOptions> {
820  ExecutionOptions eo_hint_applied = eo_work_unit;
821  CompilationOptions co_hint_applied = co;
822  auto target_node = body;
823  if (auto sort_body = dynamic_cast<const RelSort*>(body)) {
824  target_node = sort_body->getInput(0);
825  }
826  auto query_hints = getParsedQueryHint(target_node);
827  auto columnar_output_hint_enabled = false;
828  auto rowwise_output_hint_enabled = false;
829  if (query_hints) {
830  if (query_hints->isHintRegistered(QueryHint::kCpuMode)) {
831  VLOG(1) << "A user forces to run the query on the CPU execution mode";
832  co_hint_applied.device_type = ExecutorDeviceType::CPU;
833  }
834  if (query_hints->isHintRegistered(QueryHint::kColumnarOutput)) {
835  VLOG(1) << "A user forces the query to run with columnar output";
836  columnar_output_hint_enabled = true;
837  } else if (query_hints->isHintRegistered(QueryHint::kRowwiseOutput)) {
838  VLOG(1) << "A user forces the query to run with rowwise output";
839  rowwise_output_hint_enabled = true;
840  }
841  }
842  auto columnar_output_enabled = eo_work_unit.output_columnar_hint
843  ? !rowwise_output_hint_enabled
844  : columnar_output_hint_enabled;
845  if (g_cluster && (columnar_output_hint_enabled || rowwise_output_hint_enabled)) {
846  LOG(INFO) << "Currently, we do not support applying query hint to change query "
847  "output layout in distributed mode.";
848  }
849  eo_hint_applied.output_columnar_hint = columnar_output_enabled;
850  return std::make_pair(co_hint_applied, eo_hint_applied);
851  };
852 
853  // Notify foreign tables to load prior to execution
855  auto hint_applied = handle_hint();
856  const auto compound = dynamic_cast<const RelCompound*>(body);
857  if (compound) {
858  if (compound->isDeleteViaSelect()) {
859  executeDelete(compound, hint_applied.first, hint_applied.second, queue_time_ms);
860  } else if (compound->isUpdateViaSelect()) {
861  executeUpdate(compound, hint_applied.first, hint_applied.second, queue_time_ms);
862  } else {
863  exec_desc.setResult(executeCompound(
864  compound, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
865  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
866  << static_cast<int>(-compound->getId()) << ", ...)"
867  << " exec_desc.getResult().getDataPtr()->rowCount()="
868  << exec_desc.getResult().getDataPtr()->rowCount();
869  if (exec_desc.getResult().isFilterPushDownEnabled()) {
870  return;
871  }
872  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
873  }
874  return;
875  }
876  const auto project = dynamic_cast<const RelProject*>(body);
877  if (project) {
878  if (project->isDeleteViaSelect()) {
879  executeDelete(project, hint_applied.first, hint_applied.second, queue_time_ms);
880  } else if (project->isUpdateViaSelect()) {
881  executeUpdate(project, hint_applied.first, hint_applied.second, queue_time_ms);
882  } else {
883  std::optional<size_t> prev_count;
884  // Disabling the intermediate count optimization in distributed, as the previous
885  // execution descriptor will likely not hold the aggregated result.
886  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
887  // If the previous node produced a reliable count, skip the pre-flight count.
888  RelAlgNode const* const prev_body = project->getInput(0);
889  if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
890  if (RaExecutionDesc const* const prev_exec_desc =
891  prev_body->hasContextData()
892  ? prev_body->getContextData()
893  : seq.getDescriptorByBodyId(prev_body->getId(), step_idx - 1)) {
894  const auto& prev_exe_result = prev_exec_desc->getResult();
895  const auto prev_result = prev_exe_result.getRows();
896  if (prev_result) {
897  prev_count = prev_result->rowCount();
898  VLOG(3) << "Setting output row count for projection node to previous node ("
899  << prev_exec_desc->getBody()->toString() << ") to " << *prev_count;
900  }
901  }
902  }
903  }
904  exec_desc.setResult(executeProject(project,
905  hint_applied.first,
906  hint_applied.second,
907  render_info,
908  queue_time_ms,
909  prev_count));
910  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
911  << static_cast<int>(-project->getId()) << ", ...)"
912  << " exec_desc.getResult().getDataPtr()->rowCount()="
913  << exec_desc.getResult().getDataPtr()->rowCount();
914  if (exec_desc.getResult().isFilterPushDownEnabled()) {
915  return;
916  }
917  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
918  }
919  return;
920  }
921  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
922  if (aggregate) {
923  exec_desc.setResult(executeAggregate(
924  aggregate, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
925  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
926  return;
927  }
928  const auto filter = dynamic_cast<const RelFilter*>(body);
929  if (filter) {
930  exec_desc.setResult(executeFilter(
931  filter, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
932  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
933  return;
934  }
935  const auto sort = dynamic_cast<const RelSort*>(body);
936  if (sort) {
937  exec_desc.setResult(executeSort(
938  sort, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
939  if (exec_desc.getResult().isFilterPushDownEnabled()) {
940  return;
941  }
942  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
943  return;
944  }
945  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
946  if (logical_values) {
947  exec_desc.setResult(executeLogicalValues(logical_values, hint_applied.second));
948  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
949  return;
950  }
951  const auto modify = dynamic_cast<const RelModify*>(body);
952  if (modify) {
953  exec_desc.setResult(executeModify(modify, hint_applied.second));
954  return;
955  }
956  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
957  if (logical_union) {
958  exec_desc.setResult(executeUnion(logical_union,
959  seq,
960  hint_applied.first,
961  hint_applied.second,
962  render_info,
963  queue_time_ms));
964  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
965  return;
966  }
967  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
968  if (table_func) {
969  exec_desc.setResult(executeTableFunction(
970  table_func, hint_applied.first, hint_applied.second, queue_time_ms));
971  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
972  return;
973  }
974  LOG(FATAL) << "Unhandled body type: " << body->toString();
975 }
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
bool g_skip_intermediate_count
#define LOG(tag)
Definition: Logger.h:205
std::vector< size_t > outer_fragment_indices
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
std::optional< RegisteredQueryHint > getParsedQueryHint(const RelAlgNode *node)
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
double running_query_interrupt_freq
void handleNop(RaExecutionDesc &ed)
unsigned getId() const
ExecutorType executor_type
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static void reset(Executor *executor)
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
const Catalog_Namespace::Catalog & cat_
const RelAlgNode * getInput(const size_t idx) const
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
unsigned pending_query_interrupt_freq
ExecutorDeviceType device_type
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
const RaExecutionDesc * getContextData() const
ExecutionResult executeUnion(const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
double gpu_input_mem_limit_percent
bool g_cluster
unsigned dynamic_watchdog_time_limit
void executeDelete(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptorByBodyId(unsigned const body_id, size_t const start_idx) const
Executor * executor_
bool hasContextData() const
#define VLOG(n)
Definition: Logger.h:305

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgSubSeq ( const RaExecutionSequence seq,
const std::pair< size_t, size_t >  interval,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)

Definition at line 736 of file RelAlgExecutor.cpp.

References cat_, CHECK, CompilationOptions::device_type, executeRelAlgStep(), executor_, g_allow_query_step_cpu_retry, g_cluster, RaExecutionSequence::getDescriptor(), GPU, i, logger::INFO, INJECT_TIMER, left_deep_join_info_, LOG, CompilationOptions::makeCpuOnly(), now_, gpu_enabled::swap(), and temporary_tables_.

Referenced by executeRelAlgQuerySingleStep().

742  {
744  executor_->setCatalog(&cat_);
745  executor_->temporary_tables_ = &temporary_tables_;
747  time(&now_);
748  for (size_t i = interval.first; i < interval.second; i++) {
749  // only render on the last step
750  try {
751  executeRelAlgStep(seq,
752  i,
753  co,
754  eo,
755  (i == interval.second - 1) ? render_info : nullptr,
756  queue_time_ms);
757  } catch (const QueryMustRunOnCpu&) {
758  // Do not allow per-step retry if flag is off or in distributed mode
759  // TODO(todd): Determine if and when we can relax this restriction
760  // for distributed
763  throw;
764  }
765  LOG(INFO) << "Retrying current query step " << i << " on CPU";
766  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
767  executeRelAlgStep(seq,
768  i,
769  co_cpu,
770  eo,
771  (i == interval.second - 1) ? render_info : nullptr,
772  queue_time_ms);
773  }
774  }
775 
776  return seq.getDescriptor(interval.second - 1)->getResult();
777 }
RaExecutionDesc * getDescriptor(size_t idx) const
#define LOG(tag)
Definition: Logger.h:205
TemporaryTables temporary_tables_
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const Catalog_Namespace::Catalog & cat_
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutorDeviceType device_type
std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_join_info_
#define CHECK(condition)
Definition: Logger.h:211
bool g_allow_query_step_cpu_retry
Definition: Execute.cpp:83
bool g_cluster
Executor * executor_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeSimpleInsert ( const Analyzer::Query insert_query)

Definition at line 2369 of file RelAlgExecutor.cpp.

References append_datum(), DataBlockPtr::arraysPtr, cat_, CHECK, CHECK_EQ, checked_malloc(), Fragmenter_Namespace::InsertData::columnIds, CPU, Fragmenter_Namespace::InsertData::data, Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, Data_Namespace::DISK_LEVEL, executor_, import_export::fill_missing_columns(), g_cluster, get_column_descriptor(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_elem_type(), Analyzer::Query::get_result_col_list(), Analyzer::Query::get_result_table_id(), anonymous_namespace{RelAlgExecutor.cpp}::get_shard_for_key(), SQLTypeInfo::get_size(), Analyzer::Query::get_targetlist(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getMetadataForDict(), Catalog_Namespace::Catalog::getMetadataForTable(), inline_fixed_encoding_null_val(), anonymous_namespace{RelAlgExecutor.cpp}::insert_one_dict_str(), is_null(), SQLTypeInfo::is_string(), kARRAY, kBIGINT, kBOOLEAN, kCAST, kCHAR, kDATE, kDECIMAL, kDOUBLE, kENCODING_DICT, kENCODING_NONE, kFLOAT, kINT, kLINESTRING, kMULTIPOLYGON, kNUMERIC, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, DataBlockPtr::numbersPtr, Fragmenter_Namespace::InsertData::numRows, anonymous_namespace{TypedDataAccessors.h}::put_null(), anonymous_namespace{TypedDataAccessors.h}::put_null_array(), DataBlockPtr::stringsPtr, Fragmenter_Namespace::InsertData::tableId, and to_string().

Referenced by Parser::InsertValuesStmt::execute().

2369  {
2370  // Note: We currently obtain an executor for this method, but we do not need it.
2371  // Therefore, we skip the executor state setup in the regular execution path. In the
2372  // future, we will likely want to use the executor to evaluate expressions in the insert
2373  // statement.
2374 
2375  const auto& targets = query.get_targetlist();
2376  const int table_id = query.get_result_table_id();
2377  const auto& col_id_list = query.get_result_col_list();
2378 
2379  std::vector<const ColumnDescriptor*> col_descriptors;
2380  std::vector<int> col_ids;
2381  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2382  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2383  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2384 
2385  for (const int col_id : col_id_list) {
2386  const auto cd = get_column_descriptor(col_id, table_id, cat_);
2387  const auto col_enc = cd->columnType.get_compression();
2388  if (cd->columnType.is_string()) {
2389  switch (col_enc) {
2390  case kENCODING_NONE: {
2391  auto it_ok =
2392  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2393  CHECK(it_ok.second);
2394  break;
2395  }
2396  case kENCODING_DICT: {
2397  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
2398  CHECK(dd);
2399  const auto it_ok = col_buffers.emplace(
2400  col_id, std::make_unique<uint8_t[]>(cd->columnType.get_size()));
2401  CHECK(it_ok.second);
2402  break;
2403  }
2404  default:
2405  CHECK(false);
2406  }
2407  } else if (cd->columnType.is_geometry()) {
2408  auto it_ok =
2409  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2410  CHECK(it_ok.second);
2411  } else if (cd->columnType.is_array()) {
2412  auto it_ok =
2413  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2414  CHECK(it_ok.second);
2415  } else {
2416  const auto it_ok = col_buffers.emplace(
2417  col_id,
2418  std::unique_ptr<uint8_t[]>(
2419  new uint8_t[cd->columnType.get_logical_size()]())); // changed to zero-init
2420  // the buffer
2421  CHECK(it_ok.second);
2422  }
2423  col_descriptors.push_back(cd);
2424  col_ids.push_back(col_id);
2425  }
2426  size_t col_idx = 0;
2428  insert_data.databaseId = cat_.getCurrentDB().dbId;
2429  insert_data.tableId = table_id;
2430  for (auto target_entry : targets) {
2431  auto col_cv = dynamic_cast<const Analyzer::Constant*>(target_entry->get_expr());
2432  if (!col_cv) {
2433  auto col_cast = dynamic_cast<const Analyzer::UOper*>(target_entry->get_expr());
2434  CHECK(col_cast);
2435  CHECK_EQ(kCAST, col_cast->get_optype());
2436  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2437  }
2438  CHECK(col_cv);
2439  const auto cd = col_descriptors[col_idx];
2440  auto col_datum = col_cv->get_constval();
2441  auto col_type = cd->columnType.get_type();
2442  uint8_t* col_data_bytes{nullptr};
2443  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2444  (!cd->columnType.is_string() ||
2445  cd->columnType.get_compression() == kENCODING_DICT)) {
2446  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2447  CHECK(col_data_bytes_it != col_buffers.end());
2448  col_data_bytes = col_data_bytes_it->second.get();
2449  }
2450  switch (col_type) {
2451  case kBOOLEAN: {
2452  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2453  auto null_bool_val =
2454  col_datum.boolval == inline_fixed_encoding_null_val(cd->columnType);
2455  *col_data = col_cv->get_is_null() || null_bool_val
2456  ? inline_fixed_encoding_null_val(cd->columnType)
2457  : (col_datum.boolval ? 1 : 0);
2458  break;
2459  }
2460  case kTINYINT: {
2461  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2462  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2463  : col_datum.tinyintval;
2464  break;
2465  }
2466  case kSMALLINT: {
2467  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2468  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2469  : col_datum.smallintval;
2470  break;
2471  }
2472  case kINT: {
2473  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2474  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2475  : col_datum.intval;
2476  break;
2477  }
2478  case kBIGINT:
2479  case kDECIMAL:
2480  case kNUMERIC: {
2481  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2482  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2483  : col_datum.bigintval;
2484  break;
2485  }
2486  case kFLOAT: {
2487  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2488  *col_data = col_datum.floatval;
2489  break;
2490  }
2491  case kDOUBLE: {
2492  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2493  *col_data = col_datum.doubleval;
2494  break;
2495  }
2496  case kTEXT:
2497  case kVARCHAR:
2498  case kCHAR: {
2499  switch (cd->columnType.get_compression()) {
2500  case kENCODING_NONE:
2501  str_col_buffers[col_ids[col_idx]].push_back(
2502  col_datum.stringval ? *col_datum.stringval : "");
2503  break;
2504  case kENCODING_DICT: {
2505  switch (cd->columnType.get_size()) {
2506  case 1:
2508  reinterpret_cast<uint8_t*>(col_data_bytes), cd, col_cv, cat_);
2509  break;
2510  case 2:
2512  reinterpret_cast<uint16_t*>(col_data_bytes), cd, col_cv, cat_);
2513  break;
2514  case 4:
2516  reinterpret_cast<int32_t*>(col_data_bytes), cd, col_cv, cat_);
2517  break;
2518  default:
2519  CHECK(false);
2520  }
2521  break;
2522  }
2523  default:
2524  CHECK(false);
2525  }
2526  break;
2527  }
2528  case kTIME:
2529  case kTIMESTAMP:
2530  case kDATE: {
2531  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2532  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2533  : col_datum.bigintval;
2534  break;
2535  }
2536  case kARRAY: {
2537  const auto is_null = col_cv->get_is_null();
2538  const auto size = cd->columnType.get_size();
2539  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2540  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
2541  const auto is_point_coords = (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
2542  if (is_null && !is_point_coords) {
2543  if (size > 0) {
2544  // NULL fixlen array: NULL_ARRAY sentinel followed by NULL sentinels
2545  int8_t* buf = (int8_t*)checked_malloc(size);
2546  put_null_array(static_cast<void*>(buf), elem_ti, "");
2547  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2548  p += elem_ti.get_size()) {
2549  put_null(static_cast<void*>(p), elem_ti, "");
2550  }
2551  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2552  } else {
2553  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2554  }
2555  break;
2556  }
2557  const auto l = col_cv->get_value_list();
2558  size_t len = l.size() * elem_ti.get_size();
2559  if (size > 0 && static_cast<size_t>(size) != len) {
2560  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2561  std::to_string(size / elem_ti.get_size()) +
2562  " values, " + "received " + std::to_string(l.size()));
2563  }
2564  if (elem_ti.is_string()) {
2565  CHECK(kENCODING_DICT == elem_ti.get_compression());
2566  CHECK(4 == elem_ti.get_size());
2567 
2568  int8_t* buf = (int8_t*)checked_malloc(len);
2569  int32_t* p = reinterpret_cast<int32_t*>(buf);
2570 
2571  int elemIndex = 0;
2572  for (auto& e : l) {
2573  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2574  CHECK(c);
2575  insert_one_dict_str(&p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
2576  elemIndex++;
2577  }
2578  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2579 
2580  } else {
2581  int8_t* buf = (int8_t*)checked_malloc(len);
2582  int8_t* p = buf;
2583  for (auto& e : l) {
2584  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2585  CHECK(c);
2586  p = append_datum(p, c->get_constval(), elem_ti);
2587  CHECK(p);
2588  }
2589  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2590  }
2591  break;
2592  }
2593  case kPOINT:
2594  case kLINESTRING:
2595  case kPOLYGON:
2596  case kMULTIPOLYGON:
2597  str_col_buffers[col_ids[col_idx]].push_back(
2598  col_datum.stringval ? *col_datum.stringval : "");
2599  break;
2600  default:
2601  CHECK(false);
2602  }
2603  ++col_idx;
2604  }
2605  for (const auto& kv : col_buffers) {
2606  insert_data.columnIds.push_back(kv.first);
2607  DataBlockPtr p;
2608  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2609  insert_data.data.push_back(p);
2610  }
2611  for (auto& kv : str_col_buffers) {
2612  insert_data.columnIds.push_back(kv.first);
2613  DataBlockPtr p;
2614  p.stringsPtr = &kv.second;
2615  insert_data.data.push_back(p);
2616  }
2617  for (auto& kv : arr_col_buffers) {
2618  insert_data.columnIds.push_back(kv.first);
2619  DataBlockPtr p;
2620  p.arraysPtr = &kv.second;
2621  insert_data.data.push_back(p);
2622  }
2623  insert_data.numRows = 1;
2624  auto data_memory_holder = import_export::fill_missing_columns(&cat_, insert_data);
2625  const auto table_descriptor = cat_.getMetadataForTable(table_id);
2626  CHECK(table_descriptor);
2627  if (table_descriptor->nShards > 0) {
2628  auto shard = get_shard_for_key(table_descriptor, cat_, insert_data);
2629  CHECK(shard);
2630  shard->fragmenter->insertDataNoCheckpoint(insert_data);
2631  } else {
2632  table_descriptor->fragmenter->insertDataNoCheckpoint(insert_data);
2633  }
2634 
2635  // Ensure checkpoint happens across all shards, if not in distributed
2636  // mode (aggregator handles checkpointing in distributed mode)
2637  if (!g_cluster &&
2638  table_descriptor->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
2639  const_cast<Catalog_Namespace::Catalog&>(cat_).checkpointWithAutoRollback(table_id);
2640  }
2641 
2642  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2645  executor_->getRowSetMemoryOwner(),
2646  nullptr,
2647  0,
2648  0);
2649  std::vector<TargetMetaInfo> empty_targets;
2650  return {rs, empty_targets};
2651 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
int8_t * append_datum(int8_t *buf, const Datum &d, const SQLTypeInfo &ti)
Definition: Datum.cpp:524
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:114
Definition: sqltypes.h:49
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:227
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:228
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:5954
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
Definition: sqldefs.h:49
std::vector< TargetInfo > TargetInfoList
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
std::string to_string(char const *&&v)
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:208
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
CONSTEXPR DEVICE bool is_null(const T &value)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:225
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
const TableDescriptor * get_shard_for_key(const TableDescriptor *td, const Catalog_Namespace::Catalog &cat, const Fragmenter_Namespace::InsertData &data)
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
const Catalog_Namespace::Catalog & cat_
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1554
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
Definition: sqltypes.h:52
Definition: sqltypes.h:53
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
Definition: sqltypes.h:41
#define CHECK(condition)
Definition: Logger.h:211
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
bool g_cluster
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
Definition: sqltypes.h:45
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_string() const
Definition: sqltypes.h:519
int8_t * numbersPtr
Definition: sqltypes.h:226
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:861
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
Executor * executor_
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:191

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeSort ( const RelSort sort,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2683 of file RelAlgExecutor.cpp.

References SpeculativeTopNBlacklist::add(), CHECK, CHECK_EQ, anonymous_namespace{RelAlgExecutor.cpp}::check_sort_node_source_constraint(), RelSort::collationCount(), createSortInputWorkUnit(), DEBUG_TIMER, executeWorkUnit(), executor_, anonymous_namespace{RelAlgExecutor.cpp}::first_oe_is_desc(), g_cluster, anonymous_namespace{RelAlgExecutor.cpp}::get_order_entries(), RelAlgNode::getId(), RelAlgNode::getInput(), RelSort::getLimit(), RelSort::getOffset(), ExecutionResult::getRows(), RelSort::isEmptyResult(), leaf_results_, anonymous_namespace{RelAlgExecutor.cpp}::node_is_aggregate(), ExecutionOptions::output_columnar_hint, run_benchmark_import::result, RelAlgNode::setOutputMetainfo(), gpu_enabled::sort(), speculative_topn_blacklist_, and use_speculative_top_n().

Referenced by executeRelAlgStep().

2687  {
2688  auto timer = DEBUG_TIMER(__func__);
2690  const auto source = sort->getInput(0);
2691  const bool is_aggregate = node_is_aggregate(source);
2692  auto it = leaf_results_.find(sort->getId());
2693  if (it != leaf_results_.end()) {
2694  // Add any transient string literals to the sdp on the agg
2695  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2696  executor_->addTransientStringLiterals(source_work_unit.exe_unit,
2697  executor_->row_set_mem_owner_);
2698  // Handle push-down for LIMIT for multi-node
2699  auto& aggregated_result = it->second;
2700  auto& result_rows = aggregated_result.rs;
2701  const size_t limit = sort->getLimit();
2702  const size_t offset = sort->getOffset();
2703  const auto order_entries = get_order_entries(sort);
2704  if (limit || offset) {
2705  if (!order_entries.empty()) {
2706  result_rows->sort(order_entries, limit + offset, executor_);
2707  }
2708  result_rows->dropFirstN(offset);
2709  if (limit) {
2710  result_rows->keepFirstN(limit);
2711  }
2712  }
2713  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2714  sort->setOutputMetainfo(aggregated_result.targets_meta);
2715  return result;
2716  }
2717 
2718  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
2719  bool is_desc{false};
2720 
2721  auto execute_sort_query = [this,
2722  sort,
2723  &source,
2724  &is_aggregate,
2725  &eo,
2726  &co,
2727  render_info,
2728  queue_time_ms,
2729  &groupby_exprs,
2730  &is_desc]() -> ExecutionResult {
2731  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2732  is_desc = first_oe_is_desc(source_work_unit.exe_unit.sort_info.order_entries);
2733  ExecutionOptions eo_copy = {
2735  eo.allow_multifrag,
2736  eo.just_explain,
2737  eo.allow_loop_joins,
2738  eo.with_watchdog,
2739  eo.jit_debug,
2740  eo.just_validate || sort->isEmptyResult(),
2741  eo.with_dynamic_watchdog,
2742  eo.dynamic_watchdog_time_limit,
2743  eo.find_push_down_candidates,
2744  eo.just_calcite_explain,
2745  eo.gpu_input_mem_limit_percent,
2746  eo.allow_runtime_query_interrupt,
2747  eo.running_query_interrupt_freq,
2748  eo.pending_query_interrupt_freq,
2749  eo.executor_type,
2750  };
2751 
2752  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
2753  auto source_result = executeWorkUnit(source_work_unit,
2754  source->getOutputMetainfo(),
2755  is_aggregate,
2756  co,
2757  eo_copy,
2758  render_info,
2759  queue_time_ms);
2760  if (render_info && render_info->isPotentialInSituRender()) {
2761  return source_result;
2762  }
2763  if (source_result.isFilterPushDownEnabled()) {
2764  return source_result;
2765  }
2766  auto rows_to_sort = source_result.getRows();
2767  if (eo.just_explain) {
2768  return {rows_to_sort, {}};
2769  }
2770  const size_t limit = sort->getLimit();
2771  const size_t offset = sort->getOffset();
2772  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
2773  !use_speculative_top_n(source_work_unit.exe_unit,
2774  rows_to_sort->getQueryMemDesc())) {
2775  const size_t top_n = limit == 0 ? 0 : limit + offset;
2776  rows_to_sort->sort(
2777  source_work_unit.exe_unit.sort_info.order_entries, top_n, executor_);
2778  }
2779  if (limit || offset) {
2780  if (g_cluster && sort->collationCount() == 0) {
2781  if (offset >= rows_to_sort->rowCount()) {
2782  rows_to_sort->dropFirstN(offset);
2783  } else {
2784  rows_to_sort->keepFirstN(limit + offset);
2785  }
2786  } else {
2787  rows_to_sort->dropFirstN(offset);
2788  if (limit) {
2789  rows_to_sort->keepFirstN(limit);
2790  }
2791  }
2792  }
2793  return {rows_to_sort, source_result.getTargetsMeta()};
2794  };
2795 
2796  try {
2797  return execute_sort_query();
2798  } catch (const SpeculativeTopNFailed& e) {
2799  CHECK_EQ(size_t(1), groupby_exprs.size());
2800  CHECK(groupby_exprs.front());
2801  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
2802  return execute_sort_query();
2803  }
2804 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
size_t getOffset() const
static SpeculativeTopNBlacklist speculative_topn_blacklist_
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
unsigned getId() const
const std::shared_ptr< ResultSet > & getRows() const
const RelAlgNode * getInput(const size_t idx) const
bool node_is_aggregate(const RelAlgNode *ra)
bool isEmptyResult() const
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
size_t collationCount() const
size_t getLimit() const
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
bool g_cluster
void add(const std::shared_ptr< Analyzer::Expr > expr, const bool desc)
Executor * executor_
WorkUnit createSortInputWorkUnit(const RelSort *, const ExecutionOptions &eo)
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeTableFunction ( const RelTableFunction table_func,
const CompilationOptions co_in,
const ExecutionOptions eo,
const int64_t  queue_time_ms 
)
private

Definition at line 1944 of file RelAlgExecutor.cpp.

References cat_, CHECK, createTableFunctionWorkUnit(), DEBUG_TIMER, CompilationOptions::device_type, Executor::ERR_OUT_OF_GPU_MEM, executor_, g_cluster, g_enable_table_functions, get_table_infos(), QueryExecutionError::getErrorCode(), GPU, handlePersistentError(), INJECT_TIMER, ExecutionOptions::just_explain, and run_benchmark_import::result.

Referenced by executeRelAlgStep().

1947  {
1949  auto timer = DEBUG_TIMER(__func__);
1950 
1951  auto co = co_in;
1952 
1953  if (g_cluster) {
1954  throw std::runtime_error("Table functions not supported in distributed mode yet");
1955  }
1956  if (!g_enable_table_functions) {
1957  throw std::runtime_error("Table function support is disabled");
1958  }
1959  auto table_func_work_unit = createTableFunctionWorkUnit(
1960  table_func,
1961  eo.just_explain,
1962  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
1963  const auto body = table_func_work_unit.body;
1964  CHECK(body);
1965 
1966  const auto table_infos =
1967  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
1968 
1969  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1970  co.device_type,
1972  nullptr,
1973  executor_->getCatalog(),
1974  executor_->blockSize(),
1975  executor_->gridSize()),
1976  {}};
1977 
1978  try {
1979  result = {executor_->executeTableFunction(
1980  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
1981  body->getOutputMetainfo()};
1982  } catch (const QueryExecutionError& e) {
1985  throw std::runtime_error("Table function ran out of memory during execution");
1986  }
1987  result.setQueueTime(queue_time_ms);
1988  return result;
1989 }
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const Catalog_Namespace::Catalog & cat_
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1158
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
static void handlePersistentError(const int32_t error_code)
#define CHECK(condition)
Definition: Logger.h:211
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:358
bool g_cluster
Executor * executor_
bool g_enable_table_functions
Definition: Execute.cpp:108

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeUnion ( const RelLogicalUnion logical_union,
const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2165 of file RelAlgExecutor.cpp.

References RelLogicalUnion::checkForMatchingMetaInfoTypes(), createUnionWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelLogicalUnion::isAll(), isGeometry(), CompilationOptions::makeCpuOnly(), and RelAlgNode::setOutputMetainfo().

Referenced by executeRelAlgStep().

2170  {
2171  auto timer = DEBUG_TIMER(__func__);
2172  if (!logical_union->isAll()) {
2173  throw std::runtime_error("UNION without ALL is not supported yet.");
2174  }
2175  // Will throw a std::runtime_error if types don't match.
2176  logical_union->checkForMatchingMetaInfoTypes();
2177  logical_union->setOutputMetainfo(logical_union->getInput(0)->getOutputMetainfo());
2178  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2179  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2180  }
2181  auto work_unit =
2182  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
2183  return executeWorkUnit(work_unit,
2184  logical_union->getOutputMetainfo(),
2185  false,
2187  eo,
2188  render_info,
2189  queue_time_ms);
2190 }
bool isAll() const
void checkForMatchingMetaInfoTypes() const
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool isGeometry(TargetMetaInfo const &target_meta_info)
const RelAlgNode * getInput(const size_t idx) const
WorkUnit createUnionWorkUnit(const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define DEBUG_TIMER(name)
Definition: Logger.h:358
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
const std::vector< TargetMetaInfo > & getOutputMetainfo() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeUpdate ( const RelAlgNode node,
const CompilationOptions co,
const ExecutionOptions eo,
const int64_t  queue_time_ms 
)
private

Definition at line 1633 of file RelAlgExecutor.cpp.

References CompilationOptions::allow_lazy_fetch, cat_, CHECK, CHECK_EQ, createCompoundWorkUnit(), createProjectWorkUnit(), DEBUG_TIMER, Default, dml_transaction_parameters_, executor_, CompilationOptions::filter_on_deleted_column, get_table_infos(), get_temporary_table(), QueryExecutionError::getErrorCode(), getErrorMessageFromCode(), Catalog_Namespace::Catalog::getMetadataForColumn(), CompilationOptions::hoist_literals, CacheInvalidator< CACHE_HOLDING_TYPES >::invalidateCaches(), CompilationOptions::makeCpuOnly(), ExecutionOptions::output_columnar_hint, post_execution_callback_, temporary_tables_, and StorageIOFacility::yieldUpdateCallback().

Referenced by executeRelAlgStep().

1636  {
1637  CHECK(node);
1638  auto timer = DEBUG_TIMER(__func__);
1639 
1641 
1642  auto co = co_in;
1643  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
1644  // encoded string updates
1645 
1646  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
1647  auto& work_unit,
1648  const bool is_aggregate) {
1649  auto table_descriptor = node->getModifiedTableDescriptor();
1650  CHECK(table_descriptor);
1651  if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
1652  throw std::runtime_error(
1653  "UPDATE queries involving variable length columns are only supported on tables "
1654  "with the vacuum attribute set to 'delayed'");
1655  }
1656  auto updated_table_desc = node->getModifiedTableDescriptor();
1658  std::make_unique<UpdateTransactionParameters>(updated_table_desc,
1659  node->getTargetColumns(),
1660  node->getOutputMetainfo(),
1661  node->isVarlenUpdateRequired());
1662 
1663  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1664 
1665  auto execute_update_ra_exe_unit =
1666  [this, &co, &eo_in, &table_infos, &updated_table_desc](
1667  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
1669 
1670  auto eo = eo_in;
1671  if (dml_transaction_parameters_->tableIsTemporary()) {
1672  eo.output_columnar_hint = true;
1673  co_project.allow_lazy_fetch = false;
1674  co_project.filter_on_deleted_column =
1675  false; // project the entire delete column for columnar update
1676  }
1677 
1678  auto update_transaction_parameters = dynamic_cast<UpdateTransactionParameters*>(
1680  CHECK(update_transaction_parameters);
1681  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
1682  try {
1683  auto table_update_metadata =
1684  executor_->executeUpdate(ra_exe_unit,
1685  table_infos,
1686  updated_table_desc,
1687  co_project,
1688  eo,
1689  cat_,
1690  executor_->row_set_mem_owner_,
1691  update_callback,
1692  is_aggregate);
1693  post_execution_callback_ = [table_update_metadata, this]() {
1694  dml_transaction_parameters_->finalizeTransaction(cat_);
1695  TableOptimizer table_optimizer{
1696  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
1697  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
1698  };
1699  } catch (const QueryExecutionError& e) {
1700  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
1701  }
1702  };
1703 
1704  if (dml_transaction_parameters_->tableIsTemporary()) {
1705  // hold owned target exprs during execution if rewriting
1706  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1707  // rewrite temp table updates to generate the full column by moving the where
1708  // clause into a case if such a rewrite is not possible, bail on the update
1709  // operation build an expr for the update target
1710  auto update_transaction_params =
1711  dynamic_cast<UpdateTransactionParameters*>(dml_transaction_parameters_.get());
1712  CHECK(update_transaction_params);
1713  const auto td = update_transaction_params->getTableDescriptor();
1714  CHECK(td);
1715  const auto update_column_names = update_transaction_params->getUpdateColumnNames();
1716  if (update_column_names.size() > 1) {
1717  throw std::runtime_error(
1718  "Multi-column update is not yet supported for temporary tables.");
1719  }
1720 
1721  auto cd = cat_.getMetadataForColumn(td->tableId, update_column_names.front());
1722  CHECK(cd);
1723  auto projected_column_to_update =
1724  makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
1725  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
1726  work_unit.exe_unit, projected_column_to_update);
1727  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
1728  throw std::runtime_error(
1729  "Variable length updates not yet supported on temporary tables.");
1730  }
1731  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1732  } else {
1733  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1734  }
1735  };
1736 
1737  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1738  auto work_unit =
1739  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1740 
1741  execute_update_for_node(compound, work_unit, compound->isAggregate());
1742  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1743  auto work_unit =
1744  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1745 
1746  if (project->isSimple()) {
1747  CHECK_EQ(size_t(1), project->inputCount());
1748  const auto input_ra = project->getInput(0);
1749  if (dynamic_cast<const RelSort*>(input_ra)) {
1750  const auto& input_table =
1751  get_temporary_table(&temporary_tables_, -input_ra->getId());
1752  CHECK(input_table);
1753  work_unit.exe_unit.scan_limit = input_table->rowCount();
1754  }
1755  }
1756 
1757  execute_update_for_node(project, work_unit, false);
1758  } else {
1759  throw std::runtime_error("Unsupported parent node for update: " + node->toString());
1760  }
1761 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::optional< std::function< void()> > post_execution_callback_
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
TemporaryTables temporary_tables_
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
static void invalidateCaches()
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:228
StorageIOFacility::UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
const Catalog_Namespace::Catalog & cat_
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHECK(condition)
Definition: Logger.h:211
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:358
static std::string getErrorMessageFromCode(const int32_t error_code)
std::unique_ptr< TransactionParameters > dml_transaction_parameters_
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeWorkUnit ( const WorkUnit work_unit,
const std::vector< TargetMetaInfo > &  targets_meta,
const bool  is_agg,
const CompilationOptions co_in,
const ExecutionOptions eo_in,
RenderInfo render_info,
const int64_t  queue_time_ms,
const std::optional< size_t >  previous_count = std::nullopt 
)
private

Definition at line 3022 of file RelAlgExecutor.cpp.

References CompilationOptions::allow_lazy_fetch, RelAlgExecutor::WorkUnit::body, anonymous_namespace{RelAlgExecutor.cpp}::build_render_targets(), anonymous_namespace{RelAlgExecutor.cpp}::can_use_bump_allocator(), cat_, CHECK, CHECK_EQ, CHECK_GT, anonymous_namespace{RelAlgExecutor.cpp}::compute_output_buffer_size(), computeWindow(), CPU, DEBUG_TIMER, anonymous_namespace{RelAlgExecutor.cpp}::decide_approx_count_distinct_implementation(), RegisteredQueryHint::defaults(), CompilationOptions::device_type, RelAlgExecutor::WorkUnit::exe_unit, anonymous_namespace{RelAlgExecutor.cpp}::exe_unit_has_quals(), executor_, ExecutionOptions::executor_type, Extern, ExecutionOptions::find_push_down_candidates, g_big_group_threshold, g_columnar_large_projections, g_columnar_large_projections_threshold, g_enable_window_functions, g_estimator_failure_max_groupby_size, get_table_infos(), QueryExecutionError::getErrorCode(), getFilteredCountAll(), getNDVEstimation(), anonymous_namespace{RelAlgExecutor.cpp}::groups_approx_upper_bound(), handleOutOfMemoryRetry(), handlePersistentError(), INJECT_TIMER, anonymous_namespace{RelAlgExecutor.cpp}::is_agg(), anonymous_namespace{RelAlgExecutor.cpp}::is_window_execution_unit(), RenderInfo::isPotentialInSituRender(), isRowidLookup(), ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, leaf_results_, RelAlgExecutor::WorkUnit::max_groups_buffer_entry_guess, ExecutionOptions::output_columnar_hint, query_dag_, ra_exec_unit_desc_for_caching(), CardinalityEstimationRequired::range(), run_benchmark_import::result, selectFiltersToBePushedDown(), anonymous_namespace{RelAlgExecutor.cpp}::should_output_columnar(), RelAlgExecutionUnit::target_exprs, target_exprs_owned_, VLOG, and QueryExecutionError::wasMultifragKernelLaunch().

Referenced by executeAggregate(), executeCompound(), executeFilter(), executeProject(), executeSort(), and executeUnion().

3030  {
3032  auto timer = DEBUG_TIMER(__func__);
3033 
3034  auto co = co_in;
3035  auto eo = eo_in;
3036  ColumnCacheMap column_cache;
3037  if (is_window_execution_unit(work_unit.exe_unit)) {
3039  throw std::runtime_error("Window functions support is disabled");
3040  }
3041  co.device_type = ExecutorDeviceType::CPU;
3042  co.allow_lazy_fetch = false;
3043  computeWindow(work_unit.exe_unit, co, eo, column_cache, queue_time_ms);
3044  }
3045  if (!eo.just_explain && eo.find_push_down_candidates) {
3046  // find potential candidates:
3047  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
3048  if (!selected_filters.empty() || eo.just_calcite_explain) {
3049  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
3050  }
3051  }
3052  if (render_info && render_info->isPotentialInSituRender()) {
3053  co.allow_lazy_fetch = false;
3054  }
3055  const auto body = work_unit.body;
3056  CHECK(body);
3057  auto it = leaf_results_.find(body->getId());
3058  VLOG(3) << "body->getId()=" << body->getId() << " body->toString()=" << body->toString()
3059  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
3060  if (it != leaf_results_.end()) {
3061  executor_->addTransientStringLiterals(work_unit.exe_unit,
3062  executor_->row_set_mem_owner_);
3063  auto& aggregated_result = it->second;
3064  auto& result_rows = aggregated_result.rs;
3065  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3066  body->setOutputMetainfo(aggregated_result.targets_meta);
3067  if (render_info) {
3068  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3069  }
3070  return result;
3071  }
3072  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
3073 
3075  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
3076 
3077  // register query hint if query_dag_ is valid
3078  ra_exe_unit.query_hint = RegisteredQueryHint::defaults();
3079  if (query_dag_) {
3080  auto candidate = query_dag_->getQueryHint(body);
3081  if (candidate) {
3082  ra_exe_unit.query_hint = *candidate;
3083  }
3084  }
3085  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3086  if (is_window_execution_unit(ra_exe_unit)) {
3087  CHECK_EQ(table_infos.size(), size_t(1));
3088  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
3089  max_groups_buffer_entry_guess =
3090  table_infos.front().info.fragments.front().getNumTuples();
3091  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
3092  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
3093  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
3094  ra_exe_unit.scan_limit = *previous_count;
3095  } else {
3096  // TODO(adb): enable bump allocator path for render queries
3097  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
3098  ra_exe_unit.scan_limit = 0;
3099  ra_exe_unit.use_bump_allocator = true;
3100  } else if (eo.executor_type == ::ExecutorType::Extern) {
3101  ra_exe_unit.scan_limit = 0;
3102  } else if (!eo.just_explain) {
3103  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
3104  if (filter_count_all) {
3105  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
3106  }
3107  }
3108  }
3109  }
3110 
3112  const auto prefer_columnar = should_output_columnar(ra_exe_unit, render_info);
3113  if (prefer_columnar) {
3114  VLOG(1) << "Using columnar layout for projection as output size of "
3115  << ra_exe_unit.scan_limit << " rows exceeds threshold of "
3116  <<