OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutor Class Reference

#include <RelAlgExecutor.h>

+ Inheritance diagram for RelAlgExecutor:
+ Collaboration diagram for RelAlgExecutor:

Classes

struct  TableFunctionWorkUnit
 
struct  WorkUnit
 

Public Types

using TargetInfoList = std::vector< TargetInfo >
 

Public Member Functions

 RelAlgExecutor (Executor *executor, Catalog_Namespace::Catalog &cat, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
 RelAlgExecutor (Executor *executor, Catalog_Namespace::Catalog &cat, const std::string &query_ra, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
 RelAlgExecutor (Executor *executor, Catalog_Namespace::Catalog &cat, std::unique_ptr< RelAlgDag > query_dag, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
size_t getOuterFragmentCount (const CompilationOptions &co, const ExecutionOptions &eo)
 
ExecutionResult executeRelAlgQuery (const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
 
ExecutionResult executeRelAlgQueryWithFilterPushDown (const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
void prepareLeafExecution (const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
 
ExecutionResult executeRelAlgSeq (const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
 
ExecutionResult executeRelAlgSubSeq (const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
QueryStepExecutionResult executeRelAlgQuerySingleStep (const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
 
void addLeafResult (const unsigned id, const AggregatedResult &result)
 
const RelAlgNodegetRootRelAlgNode () const
 
void prepareForeignTables ()
 
std::shared_ptr< const RelAlgNodegetRootRelAlgNodeShPtr () const
 
std::pair< std::vector
< unsigned >
, std::unordered_map< unsigned,
JoinQualsPerNestingLevel > > 
getJoinInfo (const RelAlgNode *root_node)
 
std::shared_ptr< RelAlgTranslatorgetRelAlgTranslator (const RelAlgNode *root_node)
 
const std::vector
< std::shared_ptr< RexSubQuery > > & 
getSubqueries () const noexcept
 
std::optional
< RegisteredQueryHint
getParsedQueryHint (const RelAlgNode *node)
 
std::optional
< std::unordered_map< size_t,
std::unordered_map< unsigned,
RegisteredQueryHint > > > 
getParsedQueryHints ()
 
std::optional
< RegisteredQueryHint
getGlobalQueryHint ()
 
ExecutionResult executeSimpleInsert (const Analyzer::Query &insert_query, Fragmenter_Namespace::InsertDataLoader &inserter, const Catalog_Namespace::SessionInfo &session)
 
AggregatedColRange computeColRangesCache ()
 
StringDictionaryGenerations computeStringDictionaryGenerations ()
 
TableGenerations computeTableGenerations ()
 
ExecutorgetExecutor () const
 
void cleanupPostExecution ()
 
void executePostExecutionCallback ()
 
RaExecutionSequence getRaExecutionSequence (const RelAlgNode *root_node, Executor *executor)
 
void prepareForeignTable ()
 
std::unordered_set< int > getPhysicalTableIds () const
 
void prepareForSystemTableExecution (const CompilationOptions &co) const
 

Static Public Member Functions

static std::string getErrorMessageFromCode (const int32_t error_code)
 

Private Member Functions

void initializeParallelismHints ()
 
ExecutionResult executeRelAlgQueryNoRetry (const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
 
void executeRelAlgStep (const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
void executeUpdate (const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
 
void executeDelete (const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
 
ExecutionResult executeCompound (const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeAggregate (const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
ExecutionResult executeProject (const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
 
ExecutionResult executeTableFunction (const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
 
ExecutionResult executeFilter (const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeSort (const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeLogicalValues (const RelLogicalValues *, const ExecutionOptions &)
 
ExecutionResult executeModify (const RelModify *modify, const ExecutionOptions &eo)
 
ExecutionResult executeUnion (const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
WorkUnit createSortInputWorkUnit (const RelSort *, std::list< Analyzer::OrderEntry > &order_entries, const ExecutionOptions &eo)
 
ExecutionResult executeWorkUnit (const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
 
void computeWindow (const WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
 
std::unique_ptr
< WindowFunctionContext
createWindowFunctionContext (const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, std::unordered_map< QueryPlanHash, std::shared_ptr< HashJoin >> &partition_cache, std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, const WorkUnit &work_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
 
size_t getNDVEstimation (const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
 
std::optional< size_t > getFilteredCountAll (const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
 
FilterSelectivity getFilterSelectivity (const std::vector< std::shared_ptr< Analyzer::Expr >> &filter_expressions, const CompilationOptions &co, const ExecutionOptions &eo)
 
std::vector< PushedDownFilterInfoselectFiltersToBePushedDown (const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
 
bool isRowidLookup (const WorkUnit &work_unit)
 
ExecutionResult handleOutOfMemoryRetry (const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
 
WorkUnit createWorkUnit (const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createCompoundWorkUnit (const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createAggregateWorkUnit (const RelAggregate *, const SortInfo &, const bool just_explain)
 
WorkUnit createProjectWorkUnit (const RelProject *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createFilterWorkUnit (const RelFilter *, const SortInfo &, const bool just_explain)
 
WorkUnit createJoinWorkUnit (const RelJoin *, const SortInfo &, const bool just_explain)
 
WorkUnit createUnionWorkUnit (const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
 
TableFunctionWorkUnit createTableFunctionWorkUnit (const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
 
void addTemporaryTable (const int table_id, const ResultSetPtr &result)
 
void eraseFromTemporaryTables (const int table_id)
 
void handleNop (RaExecutionDesc &ed)
 
std::unordered_map< unsigned,
JoinQualsPerNestingLevel > & 
getLeftDeepJoinTreesInfo ()
 
JoinQualsPerNestingLevel translateLeftDeepJoinFilter (const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
 
std::list< std::shared_ptr
< Analyzer::Expr > > 
makeJoinQuals (const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
 
void setHasStepForUnion (bool flag)
 
bool hasStepForUnion () const
 
bool canUseResultsetCache (const ExecutionOptions &eo, RenderInfo *render_info) const
 
void setupCaching (const RelAlgNode *ra)
 
- Private Member Functions inherited from StorageIOFacility
 StorageIOFacility (Executor *executor, Catalog_Namespace::Catalog const &catalog)
 
StorageIOFacility::UpdateCallback yieldUpdateCallback (UpdateTransactionParameters &update_parameters)
 
StorageIOFacility::UpdateCallback yieldDeleteCallback (DeleteTransactionParameters &delete_parameters)
 

Static Private Member Functions

static void handlePersistentError (const int32_t error_code)
 

Private Attributes

Executorexecutor_
 
Catalog_Namespace::Catalogcat_
 
std::unique_ptr< RelAlgDagquery_dag_
 
std::shared_ptr< const
query_state::QueryState
query_state_
 
TemporaryTables temporary_tables_
 
time_t now_
 
std::unordered_map< unsigned,
JoinQualsPerNestingLevel
left_deep_join_info_
 
std::vector< std::shared_ptr
< Analyzer::Expr > > 
target_exprs_owned_
 
std::unordered_map< unsigned,
AggregatedResult
leaf_results_
 
int64_t queue_time_ms_
 
bool has_step_for_union_
 
std::unique_ptr
< TransactionParameters
dml_transaction_parameters_
 
std::optional< std::function
< void()> > 
post_execution_callback_
 

Static Private Attributes

static SpeculativeTopNBlacklist speculative_topn_blacklist_
 

Friends

class PendingExecutionClosure
 

Additional Inherited Members

- Private Types inherited from StorageIOFacility
using UpdateCallback = UpdateLogForFragment::Callback
 
using TableDescriptorType = TableDescriptor
 
using DeleteVictimOffsetList = std::vector< uint64_t >
 
using UpdateTargetOffsetList = std::vector< uint64_t >
 
using UpdateTargetTypeList = std::vector< TargetMetaInfo >
 
using UpdateTargetColumnNamesList = std::vector< std::string >
 
using TransactionLog = Fragmenter_Namespace::InsertOrderFragmenter::ModifyTransactionTracker
 
using TransactionLogPtr = std::unique_ptr< TransactionLog >
 
using ColumnValidationFunction = std::function< bool(std::string const &)>
 

Detailed Description

Definition at line 52 of file RelAlgExecutor.h.

Member Typedef Documentation

Definition at line 54 of file RelAlgExecutor.h.

Constructor & Destructor Documentation

RelAlgExecutor::RelAlgExecutor ( Executor executor,
Catalog_Namespace::Catalog cat,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 56 of file RelAlgExecutor.h.

References initializeParallelismHints().

59  : StorageIOFacility(executor, cat)
60  , executor_(executor)
61  , cat_(cat)
62  , query_state_(std::move(query_state))
63  , now_(0)
64  , queue_time_ms_(0) {
66  }
int64_t queue_time_ms_
StorageIOFacility(Executor *executor, Catalog_Namespace::Catalog const &catalog)
std::shared_ptr< const query_state::QueryState > query_state_
Catalog_Namespace::Catalog & cat_
void initializeParallelismHints()
Executor * executor_

+ Here is the call graph for this function:

RelAlgExecutor::RelAlgExecutor ( Executor executor,
Catalog_Namespace::Catalog cat,
const std::string &  query_ra,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 68 of file RelAlgExecutor.h.

References initializeParallelismHints().

72  : StorageIOFacility(executor, cat)
73  , executor_(executor)
74  , cat_(cat)
75  , query_dag_(RelAlgDagBuilder::buildDag(query_ra, cat, true))
76  , query_state_(std::move(query_state))
77  , now_(0)
78  , queue_time_ms_(0) {
80  }
int64_t queue_time_ms_
StorageIOFacility(Executor *executor, Catalog_Namespace::Catalog const &catalog)
std::unique_ptr< RelAlgDag > query_dag_
static std::unique_ptr< RelAlgDag > buildDag(const std::string &query_ra, const Catalog_Namespace::Catalog &cat, const bool optimize_dag)
Definition: RelAlgDag.cpp:3102
std::shared_ptr< const query_state::QueryState > query_state_
Catalog_Namespace::Catalog & cat_
void initializeParallelismHints()
Executor * executor_

+ Here is the call graph for this function:

RelAlgExecutor::RelAlgExecutor ( Executor executor,
Catalog_Namespace::Catalog cat,
std::unique_ptr< RelAlgDag query_dag,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 82 of file RelAlgExecutor.h.

References initializeParallelismHints().

86  : StorageIOFacility(executor, cat)
87  , executor_(executor)
88  , cat_(cat)
89  , query_dag_(std::move(query_dag))
90  , query_state_(std::move(query_state))
91  , now_(0)
92  , queue_time_ms_(0) {
94  }
int64_t queue_time_ms_
StorageIOFacility(Executor *executor, Catalog_Namespace::Catalog const &catalog)
std::unique_ptr< RelAlgDag > query_dag_
std::shared_ptr< const query_state::QueryState > query_state_
Catalog_Namespace::Catalog & cat_
void initializeParallelismHints()
Executor * executor_

+ Here is the call graph for this function:

Member Function Documentation

void RelAlgExecutor::addLeafResult ( const unsigned  id,
const AggregatedResult result 
)
inline

Definition at line 134 of file RelAlgExecutor.h.

References CHECK, and leaf_results_.

134  {
135  const auto it_ok = leaf_results_.emplace(id, result);
136  CHECK(it_ok.second);
137  }
std::unordered_map< unsigned, AggregatedResult > leaf_results_
#define CHECK(condition)
Definition: Logger.h:222
void RelAlgExecutor::addTemporaryTable ( const int  table_id,
const ResultSetPtr result 
)
inlineprivate

Definition at line 390 of file RelAlgExecutor.h.

References CHECK, CHECK_LT, and temporary_tables_.

Referenced by executeRelAlgSeq(), executeRelAlgStep(), executeSort(), executeTableFunction(), and handleNop().

390  {
391  CHECK_LT(size_t(0), result->colCount());
392  CHECK_LT(table_id, 0);
393  auto it_ok = temporary_tables_.emplace(table_id, result);
394  CHECK(it_ok.second);
395  }
TemporaryTables temporary_tables_
#define CHECK_LT(x, y)
Definition: Logger.h:232
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the caller graph for this function:

bool RelAlgExecutor::canUseResultsetCache ( const ExecutionOptions eo,
RenderInfo render_info 
) const
private

Definition at line 455 of file RelAlgExecutor.cpp.

References g_cluster, g_enable_data_recycler, g_use_query_resultset_cache, hasStepForUnion(), anonymous_namespace{RelAlgExecutor.cpp}::is_validate_or_explain_query(), heavyai::InSituFlagsOwnerInterface::isInSitu(), and ExecutionOptions::outer_fragment_indices.

Referenced by executeRelAlgStep(), executeSort(), executeTableFunction(), and executeWorkUnit().

456  {
457  auto validate_or_explain_query = is_validate_or_explain_query(eo);
458  auto query_for_partial_outer_frag = !eo.outer_fragment_indices.empty();
460  !validate_or_explain_query && !hasStepForUnion() &&
461  !query_for_partial_outer_frag &&
462  (!render_info || (render_info && !render_info->isInSitu()));
463 }
bool g_use_query_resultset_cache
Definition: Execute.cpp:148
std::vector< size_t > outer_fragment_indices
bool g_enable_data_recycler
Definition: Execute.cpp:146
bool hasStepForUnion() const
bool is_validate_or_explain_query(const ExecutionOptions &eo)
bool g_cluster

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::cleanupPostExecution ( )

Definition at line 766 of file RelAlgExecutor.cpp.

References CHECK, and executor_.

Referenced by executeRelAlgQueryNoRetry(), and getOuterFragmentCount().

766  {
767  CHECK(executor_);
768  executor_->row_set_mem_owner_ = nullptr;
769 }
#define CHECK(condition)
Definition: Logger.h:222
Executor * executor_

+ Here is the caller graph for this function:

AggregatedColRange RelAlgExecutor::computeColRangesCache ( )

Definition at line 746 of file RelAlgExecutor.cpp.

References cat_, executor_, get_physical_inputs(), and getRootRelAlgNode().

746  {
747  AggregatedColRange agg_col_range_cache;
748  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
749  return executor_->computeColRangesCache(phys_inputs);
750 }
Catalog_Namespace::Catalog & cat_
const RelAlgNode & getRootRelAlgNode() const
std::unordered_set< PhysicalInput > get_physical_inputs(const RelAlgNode *ra)
Executor * executor_

+ Here is the call graph for this function:

StringDictionaryGenerations RelAlgExecutor::computeStringDictionaryGenerations ( )

Definition at line 752 of file RelAlgExecutor.cpp.

References cat_, executor_, get_physical_inputs(), and getRootRelAlgNode().

752  {
753  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
754  return executor_->computeStringDictionaryGenerations(phys_inputs);
755 }
Catalog_Namespace::Catalog & cat_
const RelAlgNode & getRootRelAlgNode() const
std::unordered_set< PhysicalInput > get_physical_inputs(const RelAlgNode *ra)
Executor * executor_

+ Here is the call graph for this function:

TableGenerations RelAlgExecutor::computeTableGenerations ( )

Definition at line 757 of file RelAlgExecutor.cpp.

References executor_, get_physical_table_inputs(), and getRootRelAlgNode().

757  {
758  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
759  return executor_->computeTableGenerations(phys_table_ids);
760 }
const RelAlgNode & getRootRelAlgNode() const
Executor * executor_
std::unordered_set< int > get_physical_table_inputs(const RelAlgNode *ra)

+ Here is the call graph for this function:

void RelAlgExecutor::computeWindow ( const WorkUnit work_unit,
const CompilationOptions co,
const ExecutionOptions eo,
ColumnCacheMap column_cache_map,
const int64_t  queue_time_ms 
)
private

Definition at line 2440 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, WindowProjectNodeContext::create(), createWindowFunctionContext(), RelAlgExecutor::WorkUnit::exe_unit, executor_, ExecutionOptions::executor_type, Extern, get_table_infos(), Analyzer::WindowFunction::getPartitionKeys(), RelAlgExecutionUnit::input_descs, kBOOLEAN, kBW_EQ, kONE, RelAlgExecutionUnit::target_exprs, and anonymous_namespace{RelAlgExecutor.cpp}::transform_to_inner().

Referenced by executeUpdate(), and executeWorkUnit().

2444  {
2445  auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
2446  CHECK_EQ(query_infos.size(), size_t(1));
2447  if (query_infos.front().info.fragments.size() != 1) {
2448  throw std::runtime_error(
2449  "Only single fragment tables supported for window functions for now");
2450  }
2451  if (eo.executor_type == ::ExecutorType::Extern) {
2452  return;
2453  }
2454  query_infos.push_back(query_infos.front());
2455  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
2456  // a query may hold multiple window functions having the same partition by condition
2457  // then after building the first hash partition we can reuse it for the rest of
2458  // the window functions
2459  // here, a cached partition can be shared via multiple window function contexts as is
2460  // but sorted partition should be copied to reuse since we use it for (intermediate)
2461  // output buffer
2462  // todo (yoonmin) : support recycler for window function computation?
2463  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>> partition_cache;
2464  std::unordered_map<QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>
2465  sorted_partition_cache;
2466  std::unordered_map<QueryPlanHash, size_t> sorted_partition_key_ref_count_map;
2467  std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>>
2468  window_function_context_map;
2469  for (size_t target_index = 0; target_index < work_unit.exe_unit.target_exprs.size();
2470  ++target_index) {
2471  const auto& target_expr = work_unit.exe_unit.target_exprs[target_index];
2472  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
2473  if (!window_func) {
2474  continue;
2475  }
2476  // Always use baseline layout hash tables for now, make the expression a tuple.
2477  const auto& partition_keys = window_func->getPartitionKeys();
2478  std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2479  if (partition_keys.size() >= 1) {
2480  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2481  if (partition_keys.size() > 1) {
2482  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2483  } else {
2484  CHECK_EQ(partition_keys.size(), size_t(1));
2485  partition_key_tuple = partition_keys.front();
2486  }
2487  // Creates a tautology equality with the partition expression on both sides.
2488  partition_key_cond =
2489  makeExpr<Analyzer::BinOper>(kBOOLEAN,
2490  kBW_EQ,
2491  kONE,
2492  partition_key_tuple,
2493  transform_to_inner(partition_key_tuple.get()));
2494  }
2495  auto context =
2496  createWindowFunctionContext(window_func,
2497  partition_key_cond /*nullptr if no partition key*/,
2498  partition_cache,
2499  sorted_partition_key_ref_count_map,
2500  work_unit,
2501  query_infos,
2502  co,
2503  column_cache_map,
2504  executor_->getRowSetMemoryOwner());
2505  CHECK(window_function_context_map.emplace(target_index, std::move(context)).second);
2506  }
2507 
2508  for (auto& kv : window_function_context_map) {
2509  kv.second->compute(sorted_partition_key_ref_count_map, sorted_partition_cache);
2510  window_project_node_context->addWindowFunctionContext(std::move(kv.second), kv.first);
2511  }
2512 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
static WindowProjectNodeContext * create(Executor *executor)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, std::unordered_map< QueryPlanHash, std::shared_ptr< HashJoin >> &partition_cache, std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, const WorkUnit &work_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
ExecutorType executor_type
Definition: sqldefs.h:70
#define CHECK(condition)
Definition: Logger.h:222
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqldefs.h:30
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:2410
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createAggregateWorkUnit ( const RelAggregate aggregate,
const SortInfo sort_info,
const bool  just_explain 
)
private

Definition at line 4664 of file RelAlgExecutor.cpp.

References cat_, CHECK_EQ, RegisteredQueryHint::defaults(), executor_, QueryPlanDagExtractor::extractJoinInfo(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), getLeftDeepJoinTreesInfo(), RelAlgNode::getOutputMetainfo(), RelAlgNode::getQueryPlanDagHash(), RelAlgNode::inputCount(), now_, query_dag_, query_state_, RelAlgNode::setOutputMetainfo(), anonymous_namespace{RelAlgExecutor.cpp}::synthesize_inputs(), target_exprs_owned_, anonymous_namespace{RelAlgExecutor.cpp}::translate_groupby_exprs(), and anonymous_namespace{RelAlgExecutor.cpp}::translate_targets().

Referenced by createWorkUnit(), and executeAggregate().

4667  {
4668  std::vector<InputDescriptor> input_descs;
4669  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4670  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4671  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
4672  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4673  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
4674  const auto join_type = get_join_type(aggregate);
4675 
4676  RelAlgTranslator translator(cat_,
4677  query_state_,
4678  executor_,
4679  input_to_nest_level,
4680  {join_type},
4681  now_,
4682  just_explain);
4683  CHECK_EQ(size_t(1), aggregate->inputCount());
4684  const auto source = aggregate->getInput(0);
4685  const auto& in_metainfo = source->getOutputMetainfo();
4686  const auto scalar_sources =
4687  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
4688  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
4689  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4690  const auto target_exprs = translate_targets(target_exprs_owned_,
4691  target_exprs_type_infos,
4692  scalar_sources,
4693  groupby_exprs,
4694  aggregate,
4695  translator);
4696 
4697  const auto query_infos = get_table_infos(input_descs, executor_);
4698 
4699  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
4700  aggregate->setOutputMetainfo(targets_meta);
4701  auto query_hint = RegisteredQueryHint::defaults();
4702  if (query_dag_) {
4703  auto candidate = query_dag_->getQueryHint(aggregate);
4704  if (candidate) {
4705  query_hint = *candidate;
4706  }
4707  }
4708  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4709  aggregate, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
4710  return {RelAlgExecutionUnit{input_descs,
4711  input_col_descs,
4712  {},
4713  {},
4714  {},
4715  groupby_exprs,
4716  target_exprs,
4717  target_exprs_type_infos,
4718  nullptr,
4719  sort_info,
4720  0,
4721  query_hint,
4722  aggregate->getQueryPlanDagHash(),
4723  join_info.hash_table_plan_dag,
4724  join_info.table_id_to_node_map,
4725  false,
4726  std::nullopt,
4727  query_state_},
4728  aggregate,
4730  nullptr};
4731 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:808
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, std::unordered_map< size_t, SQLTypeInfo > &target_exprs_type_infos, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:826
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:247
std::shared_ptr< const query_state::QueryState > query_state_
Catalog_Namespace::Catalog & cat_
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:109
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: RelAlgDag.h:795
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const size_t inputCount() const
Definition: RelAlgDag.h:824
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:810
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createCompoundWorkUnit ( const RelCompound compound,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4339 of file RelAlgExecutor.cpp.

References cat_, CHECK_EQ, RegisteredQueryHint::defaults(), anonymous_namespace{RelAlgExecutor.cpp}::do_table_reordering(), executor_, ExecutionOptions::executor_type, QueryPlanDagExtractor::extractJoinInfo(), g_default_max_groups_buffer_entry_guess, g_from_table_reordering, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_left_deep_join_input_sizes(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), getLeftDeepJoinTreesInfo(), RelAlgNode::getQueryPlanDagHash(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), RelAlgNode::inputCount(), ExecutionOptions::just_explain, LEFT, anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), now_, shared::printContainer(), query_dag_, query_state_, anonymous_namespace{RelAlgExecutor.cpp}::rewrite_quals(), RelAlgNode::setOutputMetainfo(), RelAlgExecutionUnit::simple_quals, RelCompound::size(), target_exprs_owned_, anonymous_namespace{RelAlgExecutor.cpp}::translate_groupby_exprs(), anonymous_namespace{RelAlgExecutor.cpp}::translate_quals(), anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), anonymous_namespace{RelAlgExecutor.cpp}::translate_targets(), translateLeftDeepJoinFilter(), and VLOG.

Referenced by createWorkUnit(), executeCompound(), executeDelete(), executeUpdate(), and getOuterFragmentCount().

4342  {
4343  std::vector<InputDescriptor> input_descs;
4344  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4345  auto input_to_nest_level = get_input_nest_levels(compound, {});
4346  std::tie(input_descs, input_col_descs, std::ignore) =
4347  get_input_desc(compound, input_to_nest_level, {}, cat_);
4348  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4349  const auto query_infos = get_table_infos(input_descs, executor_);
4350  CHECK_EQ(size_t(1), compound->inputCount());
4351  const auto left_deep_join =
4352  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
4353  JoinQualsPerNestingLevel left_deep_join_quals;
4354  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4355  : std::vector<JoinType>{get_join_type(compound)};
4356  std::vector<size_t> input_permutation;
4357  std::vector<size_t> left_deep_join_input_sizes;
4358  std::optional<unsigned> left_deep_tree_id;
4359  if (left_deep_join) {
4360  left_deep_tree_id = left_deep_join->getId();
4361  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4362  left_deep_join_quals = translateLeftDeepJoinFilter(
4363  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4365  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
4366  join_types.end()) {
4367  input_permutation = do_table_reordering(input_descs,
4368  input_col_descs,
4369  left_deep_join_quals,
4370  input_to_nest_level,
4371  compound,
4372  query_infos,
4373  executor_);
4374  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
4375  std::tie(input_descs, input_col_descs, std::ignore) =
4376  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
4377  left_deep_join_quals = translateLeftDeepJoinFilter(
4378  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4379  }
4380  }
4381  RelAlgTranslator translator(cat_,
4382  query_state_,
4383  executor_,
4384  input_to_nest_level,
4385  join_types,
4386  now_,
4387  eo.just_explain);
4388  const auto scalar_sources =
4389  translate_scalar_sources(compound, translator, eo.executor_type);
4390  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
4391  const auto quals_cf = translate_quals(compound, translator);
4392  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4393  const auto target_exprs = translate_targets(target_exprs_owned_,
4394  target_exprs_type_infos,
4395  scalar_sources,
4396  groupby_exprs,
4397  compound,
4398  translator,
4399  eo.executor_type);
4400 
4401  auto query_hint = RegisteredQueryHint::defaults();
4402  if (query_dag_) {
4403  auto candidate = query_dag_->getQueryHint(compound);
4404  if (candidate) {
4405  query_hint = *candidate;
4406  }
4407  }
4408  CHECK_EQ(compound->size(), target_exprs.size());
4409  const RelAlgExecutionUnit exe_unit = {input_descs,
4410  input_col_descs,
4411  quals_cf.simple_quals,
4412  rewrite_quals(quals_cf.quals),
4413  left_deep_join_quals,
4414  groupby_exprs,
4415  target_exprs,
4416  target_exprs_type_infos,
4417  nullptr,
4418  sort_info,
4419  0,
4420  query_hint,
4421  compound->getQueryPlanDagHash(),
4422  {},
4423  {},
4424  false,
4425  std::nullopt,
4426  query_state_};
4427  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4428  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4429  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
4430  compound->setOutputMetainfo(targets_meta);
4431  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4432  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4433  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4434  rewritten_exe_unit.join_quals);
4435  }
4436  if (has_valid_query_plan_dag(compound)) {
4437  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4438  compound, left_deep_tree_id, left_deep_trees_info, executor_);
4439  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4440  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4441  }
4442  return {rewritten_exe_unit,
4443  compound,
4445  std::move(query_rewriter),
4446  input_permutation,
4447  left_deep_join_input_sizes};
4448 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:156
bool has_valid_query_plan_dag(const RelAlgNode *node)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
size_t size() const override
Definition: RelAlgDag.h:1713
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
std::vector< JoinCondition > JoinQualsPerNestingLevel
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:808
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
bool g_from_table_reordering
Definition: Execute.cpp:90
ExecutorType executor_type
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, std::unordered_map< size_t, SQLTypeInfo > &target_exprs_type_infos, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:826
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:247
std::shared_ptr< const query_state::QueryState > query_state_
Catalog_Namespace::Catalog & cat_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:109
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: RelAlgDag.h:795
QualsConjunctiveForm translate_quals(const RelCompound *compound, const RelAlgTranslator &translator)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:107
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const size_t inputCount() const
Definition: RelAlgDag.h:824
Executor * executor_
#define VLOG(n)
Definition: Logger.h:316
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)
std::list< std::shared_ptr< Analyzer::Expr > > rewrite_quals(const std::list< std::shared_ptr< Analyzer::Expr >> &quals)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createFilterWorkUnit ( const RelFilter filter,
const SortInfo sort_info,
const bool  just_explain 
)
private

Definition at line 5178 of file RelAlgExecutor.cpp.

References cat_, CHECK_EQ, RegisteredQueryHint::defaults(), executor_, QueryPlanDagExtractor::extractJoinInfo(), fold_expr(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_inputs_meta(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), RelFilter::getCondition(), getLeftDeepJoinTreesInfo(), RelAlgNode::getQueryPlanDagHash(), RelAlgNode::inputCount(), now_, query_dag_, query_state_, rewrite_expr(), RelAlgNode::setOutputMetainfo(), and target_exprs_owned_.

Referenced by createWorkUnit(), and executeFilter().

5180  {
5181  CHECK_EQ(size_t(1), filter->inputCount());
5182  std::vector<InputDescriptor> input_descs;
5183  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5184  std::vector<TargetMetaInfo> in_metainfo;
5185  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
5186  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
5187 
5188  const auto input_to_nest_level = get_input_nest_levels(filter, {});
5189  std::tie(input_descs, input_col_descs, used_inputs_owned) =
5190  get_input_desc(filter, input_to_nest_level, {}, cat_);
5191  const auto join_type = get_join_type(filter);
5192  RelAlgTranslator translator(cat_,
5193  query_state_,
5194  executor_,
5195  input_to_nest_level,
5196  {join_type},
5197  now_,
5198  just_explain);
5199  std::tie(in_metainfo, target_exprs_owned) =
5200  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
5201  const auto filter_expr = translator.translate(filter->getCondition());
5202  const auto query_infos = get_table_infos(input_descs, executor_);
5203 
5204  const auto qual = fold_expr(filter_expr.get());
5205  target_exprs_owned_.insert(
5206  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
5207 
5208  const auto target_exprs = get_raw_pointers(target_exprs_owned);
5209  filter->setOutputMetainfo(in_metainfo);
5210  const auto rewritten_qual = rewrite_expr(qual.get());
5211  auto query_hint = RegisteredQueryHint::defaults();
5212  if (query_dag_) {
5213  auto candidate = query_dag_->getQueryHint(filter);
5214  if (candidate) {
5215  query_hint = *candidate;
5216  }
5217  }
5218  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
5219  filter, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
5220  return {{input_descs,
5221  input_col_descs,
5222  {},
5223  {rewritten_qual ? rewritten_qual : qual},
5224  {},
5225  {nullptr},
5226  target_exprs,
5227  {},
5228  nullptr,
5229  sort_info,
5230  0,
5231  query_hint,
5232  filter->getQueryPlanDagHash(),
5233  join_info.hash_table_plan_dag,
5234  join_info.table_id_to_node_map},
5235  filter,
5237  nullptr};
5238 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
const RexScalar * getCondition() const
Definition: RelAlgDag.h:1581
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:808
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
JoinType get_join_type(const RelAlgNode *ra)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:247
std::shared_ptr< const query_state::QueryState > query_state_
Catalog_Namespace::Catalog & cat_
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:109
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: RelAlgDag.h:795
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const size_t inputCount() const
Definition: RelAlgDag.h:824
Executor * executor_
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

WorkUnit RelAlgExecutor::createJoinWorkUnit ( const RelJoin ,
const SortInfo ,
const bool  just_explain 
)
private
RelAlgExecutor::WorkUnit RelAlgExecutor::createProjectWorkUnit ( const RelProject project,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4733 of file RelAlgExecutor.cpp.

References cat_, RegisteredQueryHint::defaults(), anonymous_namespace{RelAlgExecutor.cpp}::do_table_reordering(), executor_, ExecutionOptions::executor_type, QueryPlanDagExtractor::extractJoinInfo(), g_default_max_groups_buffer_entry_guess, g_from_table_reordering, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_left_deep_join_input_sizes(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), getLeftDeepJoinTreesInfo(), RelAlgNode::getQueryPlanDagHash(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), ExecutionOptions::just_explain, anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), now_, query_dag_, query_state_, RelAlgNode::setOutputMetainfo(), target_exprs_owned_, anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), and translateLeftDeepJoinFilter().

Referenced by createWorkUnit(), executeDelete(), executeProject(), executeUpdate(), and getOuterFragmentCount().

4736  {
4737  std::vector<InputDescriptor> input_descs;
4738  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4739  auto input_to_nest_level = get_input_nest_levels(project, {});
4740  std::tie(input_descs, input_col_descs, std::ignore) =
4741  get_input_desc(project, input_to_nest_level, {}, cat_);
4742  const auto query_infos = get_table_infos(input_descs, executor_);
4743 
4744  const auto left_deep_join =
4745  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
4746  JoinQualsPerNestingLevel left_deep_join_quals;
4747  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4748  : std::vector<JoinType>{get_join_type(project)};
4749  std::vector<size_t> input_permutation;
4750  std::vector<size_t> left_deep_join_input_sizes;
4751  std::optional<unsigned> left_deep_tree_id;
4752  if (left_deep_join) {
4753  left_deep_tree_id = left_deep_join->getId();
4754  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4755  const auto query_infos = get_table_infos(input_descs, executor_);
4756  left_deep_join_quals = translateLeftDeepJoinFilter(
4757  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4759  input_permutation = do_table_reordering(input_descs,
4760  input_col_descs,
4761  left_deep_join_quals,
4762  input_to_nest_level,
4763  project,
4764  query_infos,
4765  executor_);
4766  input_to_nest_level = get_input_nest_levels(project, input_permutation);
4767  std::tie(input_descs, input_col_descs, std::ignore) =
4768  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
4769  left_deep_join_quals = translateLeftDeepJoinFilter(
4770  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4771  }
4772  }
4773 
4774  RelAlgTranslator translator(cat_,
4775  query_state_,
4776  executor_,
4777  input_to_nest_level,
4778  join_types,
4779  now_,
4780  eo.just_explain);
4781  const auto target_exprs_owned =
4782  translate_scalar_sources(project, translator, eo.executor_type);
4783 
4784  target_exprs_owned_.insert(
4785  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4786  const auto target_exprs = get_raw_pointers(target_exprs_owned);
4787  auto query_hint = RegisteredQueryHint::defaults();
4788  if (query_dag_) {
4789  auto candidate = query_dag_->getQueryHint(project);
4790  if (candidate) {
4791  query_hint = *candidate;
4792  }
4793  }
4794  const RelAlgExecutionUnit exe_unit = {input_descs,
4795  input_col_descs,
4796  {},
4797  {},
4798  left_deep_join_quals,
4799  {nullptr},
4800  target_exprs,
4801  {},
4802  nullptr,
4803  sort_info,
4804  0,
4805  query_hint,
4806  project->getQueryPlanDagHash(),
4807  {},
4808  {},
4809  false,
4810  std::nullopt,
4811  query_state_};
4812  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4813  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4814  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
4815  project->setOutputMetainfo(targets_meta);
4816  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4817  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4818  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4819  rewritten_exe_unit.join_quals);
4820  }
4821  if (has_valid_query_plan_dag(project)) {
4822  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4823  project, left_deep_tree_id, left_deep_trees_info, executor_);
4824  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4825  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4826  }
4827  return {rewritten_exe_unit,
4828  project,
4830  std::move(query_rewriter),
4831  input_permutation,
4832  left_deep_join_input_sizes};
4833 }
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:156
bool has_valid_query_plan_dag(const RelAlgNode *node)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
std::vector< JoinCondition > JoinQualsPerNestingLevel
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:808
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
bool g_from_table_reordering
Definition: Execute.cpp:90
ExecutorType executor_type
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:826
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:247
std::shared_ptr< const query_state::QueryState > query_state_
Catalog_Namespace::Catalog & cat_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:109
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: RelAlgDag.h:795
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
Executor * executor_
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createSortInputWorkUnit ( const RelSort sort,
std::list< Analyzer::OrderEntry > &  order_entries,
const ExecutionOptions eo 
)
private

Definition at line 3343 of file RelAlgExecutor.cpp.

References CHECK_GT, RelSort::collationCount(), SpeculativeTopNBlacklist::contains(), createWorkUnit(), Default, anonymous_namespace{RelAlgExecutor.cpp}::first_oe_is_desc(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_scan_limit(), get_target_info(), RelAlgNode::getInput(), RelSort::getLimit(), RelSort::getOffset(), RelAlgExecutionUnit::input_descs, RelSort::isLimitDelivered(), RelAlgNode::setOutputMetainfo(), speculative_topn_blacklist_, SpeculativeTopN, and StreamingTopN.

Referenced by executeRelAlgQuerySingleStep(), and executeSort().

3346  {
3347  const auto source = sort->getInput(0);
3348  const size_t limit = sort->getLimit();
3349  const size_t offset = sort->getOffset();
3350  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
3351  const size_t scan_total_limit =
3352  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
3353  size_t max_groups_buffer_entry_guess{
3354  scan_total_limit ? scan_total_limit : g_default_max_groups_buffer_entry_guess};
3356  SortInfo sort_info{
3357  order_entries, sort_algorithm, limit, offset, sort->isLimitDelivered()};
3358  auto source_work_unit = createWorkUnit(source, sort_info, eo);
3359  const auto& source_exe_unit = source_work_unit.exe_unit;
3360 
3361  // we do not allow sorting geometry or array types
3362  for (auto order_entry : order_entries) {
3363  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
3364  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
3365  const auto& ti = get_target_info(te, false);
3366  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
3367  throw std::runtime_error(
3368  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
3369  }
3370  }
3371 
3372  if (source_exe_unit.groupby_exprs.size() == 1) {
3373  if (!source_exe_unit.groupby_exprs.front()) {
3374  sort_algorithm = SortAlgorithm::StreamingTopN;
3375  } else {
3376  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
3377  first_oe_is_desc(order_entries))) {
3378  sort_algorithm = SortAlgorithm::Default;
3379  }
3380  }
3381  }
3382 
3383  sort->setOutputMetainfo(source->getOutputMetainfo());
3384  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
3385  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
3386  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
3387  std::move(source_exe_unit.input_col_descs),
3388  source_exe_unit.simple_quals,
3389  source_exe_unit.quals,
3390  source_exe_unit.join_quals,
3391  source_exe_unit.groupby_exprs,
3392  source_exe_unit.target_exprs,
3393  source_exe_unit.target_exprs_original_type_infos,
3394  nullptr,
3395  {sort_info.order_entries,
3396  sort_algorithm,
3397  limit,
3398  offset,
3399  sort_info.limit_delivered},
3400  scan_total_limit,
3401  source_exe_unit.query_hint,
3402  source_exe_unit.query_plan_dag_hash,
3403  source_exe_unit.hash_table_build_plan_dag,
3404  source_exe_unit.table_id_to_node_map,
3405  source_exe_unit.use_bump_allocator,
3406  source_exe_unit.union_all,
3407  source_exe_unit.query_state},
3408  source,
3409  max_groups_buffer_entry_guess,
3410  std::move(source_work_unit.query_rewriter),
3411  source_work_unit.input_permutation,
3412  source_work_unit.left_deep_join_input_sizes};
3413 }
size_t getOffset() const
Definition: RelAlgDag.h:1835
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
static SpeculativeTopNBlacklist speculative_topn_blacklist_
std::vector< InputDescriptor > input_descs
#define CHECK_GT(x, y)
Definition: Logger.h:234
TargetInfo get_target_info(const Analyzer::Expr *target_expr, const bool bigint_count)
Definition: TargetInfo.h:97
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:826
size_t collationCount() const
Definition: RelAlgDag.h:1816
size_t getLimit() const
Definition: RelAlgDag.h:1833
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:109
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: RelAlgDag.h:795
bool isLimitDelivered() const
Definition: RelAlgDag.h:1831
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::TableFunctionWorkUnit RelAlgExecutor::createTableFunctionWorkUnit ( const RelTableFunction table_func,
const bool  just_explain,
const bool  is_gpu 
)
private

Definition at line 4957 of file RelAlgExecutor.cpp.

References bind_table_function(), cat_, CHECK, CHECK_EQ, CHECK_GT, CHECK_LT, RelTableFunction::countRexLiteralArgs(), DEFAULT_ROW_MULTIPLIER_VALUE, executor_, ext_arg_type_to_type_info(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelTableFunction::getColInputsSize(), RelTableFunction::getFunctionName(), RelTableFunction::getTableFuncInputAt(), kENCODING_ARRAY, kINT, LOG, now_, query_state_, RelAlgNode::setOutputMetainfo(), TableFunctions, TableFunctionExecutionUnit::target_exprs, target_exprs_owned_, to_string(), TRANSIENT_DICT_ID, anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), UNREACHABLE, and logger::WARNING.

Referenced by executeTableFunction().

4960  {
4961  std::vector<InputDescriptor> input_descs;
4962  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4963  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
4964  std::tie(input_descs, input_col_descs, std::ignore) =
4965  get_input_desc(rel_table_func, input_to_nest_level, {}, cat_);
4966  const auto query_infos = get_table_infos(input_descs, executor_);
4967  RelAlgTranslator translator(
4968  cat_, query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
4969  const auto input_exprs_owned = translate_scalar_sources(
4970  rel_table_func, translator, ::ExecutorType::TableFunctions);
4971  target_exprs_owned_.insert(
4972  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
4973  auto input_exprs = get_raw_pointers(input_exprs_owned);
4974 
4975  const auto table_function_impl_and_type_infos = [=]() {
4976  if (is_gpu) {
4977  try {
4978  return bind_table_function(
4979  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4980  } catch (ExtensionFunctionBindingError& e) {
4981  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
4982  << " Redirecting " << rel_table_func->getFunctionName()
4983  << " step to run on CPU.";
4984  throw QueryMustRunOnCpu();
4985  }
4986  } else {
4987  try {
4988  return bind_table_function(
4989  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4990  } catch (ExtensionFunctionBindingError& e) {
4991  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
4992  throw;
4993  }
4994  }
4995  }();
4996  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
4997  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
4998 
4999  size_t output_row_sizing_param = 0;
5000  if (table_function_impl
5001  .hasUserSpecifiedOutputSizeParameter()) { // constant and row multiplier
5002  const auto parameter_index =
5003  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
5004  CHECK_GT(parameter_index, size_t(0));
5005  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
5006  const auto parameter_expr =
5007  rel_table_func->getTableFuncInputAt(parameter_index - 1);
5008  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
5009  if (!parameter_expr_literal) {
5010  throw std::runtime_error(
5011  "Provided output buffer sizing parameter is not a literal. Only literal "
5012  "values are supported with output buffer sizing configured table "
5013  "functions.");
5014  }
5015  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
5016  if (literal_val < 0) {
5017  throw std::runtime_error("Provided output sizing parameter " +
5018  std::to_string(literal_val) +
5019  " must be positive integer.");
5020  }
5021  output_row_sizing_param = static_cast<size_t>(literal_val);
5022  } else {
5023  // RowMultiplier not specified in the SQL query. Set it to 1
5024  output_row_sizing_param = 1; // default value for RowMultiplier
5025  static Datum d = {DEFAULT_ROW_MULTIPLIER_VALUE};
5026  static auto DEFAULT_ROW_MULTIPLIER_EXPR =
5027  makeExpr<Analyzer::Constant>(kINT, false, d);
5028  // Push the constant 1 to input_exprs
5029  input_exprs.insert(input_exprs.begin() + parameter_index - 1,
5030  DEFAULT_ROW_MULTIPLIER_EXPR.get());
5031  }
5032  } else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
5033  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
5034  } else {
5035  UNREACHABLE();
5036  }
5037 
5038  std::vector<Analyzer::ColumnVar*> input_col_exprs;
5039  size_t input_index = 0;
5040  size_t arg_index = 0;
5041  const auto table_func_args = table_function_impl.getInputArgs();
5042  CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
5043  for (const auto& ti : table_function_type_infos) {
5044  if (ti.is_column_list()) {
5045  for (int i = 0; i < ti.get_dimension(); i++) {
5046  auto& input_expr = input_exprs[input_index];
5047  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
5048  CHECK(col_var);
5049 
5050  // avoid setting type info to ti here since ti doesn't have all the
5051  // properties correctly set
5052  auto type_info = input_expr->get_type_info();
5053  if (ti.is_column_array()) {
5054  type_info.set_compression(kENCODING_ARRAY);
5055  type_info.set_subtype(type_info.get_subtype()); // set type to be subtype
5056  } else {
5057  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5058  }
5059  type_info.set_type(ti.get_type()); // set type to column list
5060  type_info.set_dimension(ti.get_dimension());
5061  input_expr->set_type_info(type_info);
5062 
5063  input_col_exprs.push_back(col_var);
5064  input_index++;
5065  }
5066  } else if (ti.is_column()) {
5067  auto& input_expr = input_exprs[input_index];
5068  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
5069  CHECK(col_var);
5070  // same here! avoid setting type info to ti since it doesn't have all the
5071  // properties correctly set
5072  auto type_info = input_expr->get_type_info();
5073  if (ti.is_column_array()) {
5074  type_info.set_compression(kENCODING_ARRAY);
5075  type_info.set_subtype(type_info.get_subtype()); // set type to be subtype
5076  } else {
5077  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5078  }
5079  type_info.set_type(ti.get_type()); // set type to column
5080  input_expr->set_type_info(type_info);
5081  input_col_exprs.push_back(col_var);
5082  input_index++;
5083  } else {
5084  auto input_expr = input_exprs[input_index];
5085  auto ext_func_arg_ti = ext_arg_type_to_type_info(table_func_args[arg_index]);
5086  if (ext_func_arg_ti != input_expr->get_type_info()) {
5087  input_exprs[input_index] = input_expr->add_cast(ext_func_arg_ti).get();
5088  }
5089  input_index++;
5090  }
5091  arg_index++;
5092  }
5093  CHECK_EQ(input_col_exprs.size(), rel_table_func->getColInputsSize());
5094  std::vector<Analyzer::Expr*> table_func_outputs;
5095  constexpr int32_t transient_pos{-1};
5096  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
5097  auto ti = table_function_impl.getOutputSQLType(i);
5098  if (ti.is_dict_encoded_string() || ti.is_text_encoding_dict_array()) {
5099  auto p = table_function_impl.getInputID(i);
5100 
5101  int32_t input_pos = p.first;
5102  if (input_pos == transient_pos) {
5103  ti.set_comp_param(TRANSIENT_DICT_ID);
5104  } else {
5105  // Iterate over the list of arguments to compute the offset. Use this offset to
5106  // get the corresponding input
5107  int32_t offset = 0;
5108  for (int j = 0; j < input_pos; j++) {
5109  const auto ti = table_function_type_infos[j];
5110  offset += ti.is_column_list() ? ti.get_dimension() : 1;
5111  }
5112  input_pos = offset + p.second;
5113 
5114  CHECK_LT(input_pos, input_exprs.size());
5115  int32_t comp_param =
5116  input_exprs_owned[input_pos]->get_type_info().get_comp_param();
5117  ti.set_comp_param(comp_param);
5118  }
5119  }
5120  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
5121  table_func_outputs.push_back(target_exprs_owned_.back().get());
5122  }
5123  const TableFunctionExecutionUnit exe_unit = {
5124  input_descs,
5125  input_col_descs,
5126  input_exprs, // table function inputs
5127  input_col_exprs, // table function column inputs (duplicates w/ above)
5128  table_func_outputs, // table function projected exprs
5129  output_row_sizing_param, // output buffer sizing param
5130  table_function_impl};
5131  const auto targets_meta = get_targets_meta(rel_table_func, exe_unit.target_exprs);
5132  rel_table_func->setOutputMetainfo(targets_meta);
5133  return {exe_unit, rel_table_func};
5134 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
#define LOG(tag)
Definition: Logger.h:216
#define UNREACHABLE()
Definition: Logger.h:266
#define CHECK_GT(x, y)
Definition: Logger.h:234
std::string to_string(char const *&&v)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
#define CHECK_LT(x, y)
Definition: Logger.h:232
#define TRANSIENT_DICT_ID
Definition: sqltypes.h:334
#define DEFAULT_ROW_MULTIPLIER_VALUE
std::shared_ptr< const query_state::QueryState > query_state_
Catalog_Namespace::Catalog & cat_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
const std::tuple< table_functions::TableFunction, std::vector< SQLTypeInfo > > bind_table_function(std::string name, Analyzer::ExpressionPtrVector input_args, const std::vector< table_functions::TableFunction > &table_funcs, const bool is_gpu)
#define CHECK(condition)
Definition: Logger.h:222
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqltypes.h:59
std::vector< Analyzer::Expr * > target_exprs
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
Executor * executor_
SQLTypeInfo ext_arg_type_to_type_info(const ExtArgumentType ext_arg_type)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createUnionWorkUnit ( const RelLogicalUnion logical_union,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4853 of file RelAlgExecutor.cpp.

References gpu_enabled::accumulate(), shared::append_move(), cat_, CHECK, RelRexToStringConfig::defaults(), RegisteredQueryHint::defaults(), EMPTY_HASHED_PLAN_DAG_KEY, executor_, g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelLogicalUnion::isAll(), shared::printContainer(), query_state_, RelAlgNode::setOutputMetainfo(), anonymous_namespace{RelAlgExecutor.cpp}::target_exprs_for_union(), target_exprs_owned_, RelAlgNode::toString(), and VLOG.

Referenced by executeUnion().

4856  {
4857  std::vector<InputDescriptor> input_descs;
4858  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4859  // Map ra input ptr to index (0, 1).
4860  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
4861  std::tie(input_descs, input_col_descs, std::ignore) =
4862  get_input_desc(logical_union, input_to_nest_level, {}, cat_);
4863  const auto query_infos = get_table_infos(input_descs, executor_);
4864  auto const max_num_tuples =
4865  std::accumulate(query_infos.cbegin(),
4866  query_infos.cend(),
4867  size_t(0),
4868  [](auto max, auto const& query_info) {
4869  return std::max(max, query_info.info.getNumTuples());
4870  });
4871 
4872  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
4873  for (auto& pair : input_to_nest_level) {
4874  VLOG(3) << " (" << pair.first->toString(RelRexToStringConfig::defaults()) << ", "
4875  << pair.second << ')';
4876  }
4877 
4878  // For UNION queries, we need to keep the target_exprs from both subqueries since they
4879  // may differ on StringDictionaries.
4880  std::vector<Analyzer::Expr*> target_exprs_pair[2];
4881  for (unsigned i = 0; i < 2; ++i) {
4882  auto input_exprs_owned = target_exprs_for_union(logical_union->getInput(i));
4883  CHECK(!input_exprs_owned.empty())
4884  << "No metainfo found for input node(" << i << ") "
4885  << logical_union->getInput(i)->toString(RelRexToStringConfig::defaults());
4886  VLOG(3) << "i(" << i << ") input_exprs_owned.size()=" << input_exprs_owned.size();
4887  for (auto& input_expr : input_exprs_owned) {
4888  VLOG(3) << " " << input_expr->toString();
4889  }
4890  target_exprs_pair[i] = get_raw_pointers(input_exprs_owned);
4891  shared::append_move(target_exprs_owned_, std::move(input_exprs_owned));
4892  }
4893 
4894  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
4895  << " input_col_descs=" << shared::printContainer(input_col_descs)
4896  << " target_exprs.size()=" << target_exprs_pair[0].size()
4897  << " max_num_tuples=" << max_num_tuples;
4898 
4899  const RelAlgExecutionUnit exe_unit = {input_descs,
4900  input_col_descs,
4901  {}, // quals_cf.simple_quals,
4902  {}, // rewrite_quals(quals_cf.quals),
4903  {},
4904  {nullptr},
4905  target_exprs_pair[0],
4906  {},
4907  nullptr,
4908  sort_info,
4909  max_num_tuples,
4912  {},
4913  {},
4914  false,
4915  logical_union->isAll(),
4916  query_state_,
4917  target_exprs_pair[1]};
4918  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4919  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4920 
4921  RelAlgNode const* input0 = logical_union->getInput(0);
4922  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
4923  logical_union->setOutputMetainfo(
4924  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4925  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
4926  logical_union->setOutputMetainfo(
4927  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4928  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
4929  logical_union->setOutputMetainfo(
4930  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4931  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
4932  logical_union->setOutputMetainfo(
4933  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4934  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
4935  logical_union->setOutputMetainfo(
4936  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4937  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
4938  logical_union->setOutputMetainfo(
4939  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4940  } else if (dynamic_cast<const RelSort*>(input0)) {
4941  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
4942  } else {
4943  throw QueryNotSupported("Unsupported input type: " +
4945  }
4946  VLOG(3) << "logical_union->getOutputMetainfo()="
4947  << shared::printContainer(logical_union->getOutputMetainfo())
4948  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
4949  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
4950 
4951  return {rewritten_exe_unit,
4952  logical_union,
4954  std::move(query_rewriter)};
4955 }
bool isAll() const
Definition: RelAlgDag.h:2301
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
size_t append_move(std::vector< T > &destination, std::vector< T > &&source)
Definition: misc.h:77
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
virtual std::string toString(RelRexToStringConfig config) const =0
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:826
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_for_union(RelAlgNode const *input_node)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
static RegisteredQueryHint defaults()
Definition: QueryHint.h:247
std::shared_ptr< const query_state::QueryState > query_state_
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:49
Catalog_Namespace::Catalog & cat_
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:109
#define CHECK(condition)
Definition: Logger.h:222
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: RelAlgDag.h:795
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:107
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:810
Executor * executor_
#define VLOG(n)
Definition: Logger.h:316

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::unique_ptr< WindowFunctionContext > RelAlgExecutor::createWindowFunctionContext ( const Analyzer::WindowFunction window_func,
const std::shared_ptr< Analyzer::BinOper > &  partition_key_cond,
std::unordered_map< QueryPlanHash, std::shared_ptr< HashJoin >> &  partition_cache,
std::unordered_map< QueryPlanHash, size_t > &  sorted_partition_key_ref_count_map,
const WorkUnit work_unit,
const std::vector< InputTableInfo > &  query_infos,
const CompilationOptions co,
ColumnCacheMap column_cache_map,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner 
)
private

Definition at line 2514 of file RelAlgExecutor.cpp.

References RegisteredQueryHint::aggregate_tree_fanout, RelAlgExecutor::WorkUnit::body, CHECK, CHECK_EQ, Data_Namespace::CPU_LEVEL, CompilationOptions::device_type, RelAlgExecutor::WorkUnit::exe_unit, executor_, g_window_function_aggregation_tree_fanout, Analyzer::WindowFunction::getArgs(), Analyzer::WindowFunction::getCollation(), ColumnFetcher::getOneColumnFragment(), Analyzer::WindowFunction::getOrderKeys(), RelAlgNode::getQueryPlanDagHash(), GPU, Data_Namespace::GPU_LEVEL, RelAlgExecutionUnit::hash_table_build_plan_dag, INVALID, OneToMany, RelAlgExecutionUnit::query_hint, RelAlgExecutionUnit::table_id_to_node_map, and VLOG.

Referenced by computeWindow().

2523  {
2524  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2525  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
2528  std::unique_ptr<WindowFunctionContext> context;
2529  auto partition_cache_key = work_unit.body->getQueryPlanDagHash();
2530  if (partition_key_cond) {
2531  auto partition_cond_str = partition_key_cond->toString();
2532  auto partition_key_hash = boost::hash_value(partition_cond_str);
2533  boost::hash_combine(partition_cache_key, partition_key_hash);
2534  std::shared_ptr<HashJoin> partition_ptr;
2535  auto cached_hash_table_it = partition_cache.find(partition_cache_key);
2536  if (cached_hash_table_it != partition_cache.end()) {
2537  partition_ptr = cached_hash_table_it->second;
2538  VLOG(1) << "Reuse a hash table to compute window function context (key: "
2539  << partition_cache_key << ", partition condition: " << partition_cond_str
2540  << ")";
2541  } else {
2542  const auto hash_table_or_err = executor_->buildHashTableForQualifier(
2543  partition_key_cond,
2544  query_infos,
2545  memory_level,
2546  JoinType::INVALID, // for window function
2548  column_cache_map,
2549  work_unit.exe_unit.hash_table_build_plan_dag,
2550  work_unit.exe_unit.query_hint,
2551  work_unit.exe_unit.table_id_to_node_map);
2552  if (!hash_table_or_err.fail_reason.empty()) {
2553  throw std::runtime_error(hash_table_or_err.fail_reason);
2554  }
2555  CHECK(hash_table_or_err.hash_table->getHashType() == HashType::OneToMany);
2556  partition_ptr = hash_table_or_err.hash_table;
2557  CHECK(partition_cache.insert(std::make_pair(partition_cache_key, partition_ptr))
2558  .second);
2559  VLOG(1) << "Put a generated hash table for computing window function context to "
2560  "cache (key: "
2561  << partition_cache_key << ", partition condition: " << partition_cond_str
2562  << ")";
2563  }
2564  CHECK(partition_ptr);
2565  auto aggregate_tree_fanout = g_window_function_aggregation_tree_fanout;
2566  if (work_unit.exe_unit.query_hint.aggregate_tree_fanout != aggregate_tree_fanout) {
2567  aggregate_tree_fanout = work_unit.exe_unit.query_hint.aggregate_tree_fanout;
2568  VLOG(1) << "Aggregate tree's fanout is set to " << aggregate_tree_fanout;
2569  }
2570  context = std::make_unique<WindowFunctionContext>(window_func,
2571  partition_cache_key,
2572  partition_ptr,
2573  elem_count,
2574  co.device_type,
2575  row_set_mem_owner,
2576  aggregate_tree_fanout);
2577  } else {
2578  context = std::make_unique<WindowFunctionContext>(
2579  window_func, elem_count, co.device_type, row_set_mem_owner);
2580  }
2581  const auto& order_keys = window_func->getOrderKeys();
2582  if (!order_keys.empty()) {
2583  auto sorted_partition_cache_key = partition_cache_key;
2584  for (auto& order_key : order_keys) {
2585  boost::hash_combine(sorted_partition_cache_key, order_key->toString());
2586  }
2587  for (auto& collation : window_func->getCollation()) {
2588  boost::hash_combine(sorted_partition_cache_key, collation.toString());
2589  }
2590  context->setSortedPartitionCacheKey(sorted_partition_cache_key);
2591  auto cache_key_cnt_it =
2592  sorted_partition_key_ref_count_map.try_emplace(sorted_partition_cache_key, 1);
2593  if (!cache_key_cnt_it.second) {
2594  sorted_partition_key_ref_count_map[sorted_partition_cache_key] =
2595  cache_key_cnt_it.first->second + 1;
2596  }
2597 
2598  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2599  for (const auto& order_key : order_keys) {
2600  const auto order_col =
2601  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
2602  if (!order_col) {
2603  throw std::runtime_error("Only order by columns supported for now");
2604  }
2605  const int8_t* column;
2606  size_t join_col_elem_count;
2607  std::tie(column, join_col_elem_count) =
2609  *order_col,
2610  query_infos.front().info.fragments.front(),
2611  memory_level,
2612  0,
2613  nullptr,
2614  /*thread_idx=*/0,
2615  chunks_owner,
2616  column_cache_map);
2617 
2618  CHECK_EQ(join_col_elem_count, elem_count);
2619  context->addOrderColumn(column, order_col->get_type_info(), chunks_owner);
2620  }
2621  }
2622  if (context->getWindowFunction()->hasFraming()) {
2623  // todo (yoonmin) : if we try to support generic window function expression without
2624  // extra project node, we need to revisit here b/c the current logic assumes that
2625  // window function expression has a single input source
2626  auto& window_function_expression_args = window_func->getArgs();
2627  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2628  for (auto& expr : window_function_expression_args) {
2629  if (const auto arg_col_var =
2630  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(expr)) {
2631  auto const [column, join_col_elem_count] = ColumnFetcher::getOneColumnFragment(
2632  executor_,
2633  *arg_col_var,
2634  query_infos.front().info.fragments.front(),
2635  memory_level,
2636  0,
2637  nullptr,
2638  /*thread_idx=*/0,
2639  chunks_owner,
2640  column_cache_map);
2641 
2642  CHECK_EQ(join_col_elem_count, elem_count);
2643  context->addColumnBufferForWindowFunctionExpression(column, chunks_owner);
2644  }
2645  }
2646  }
2647  return context;
2648 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:2414
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:2432
size_t g_window_function_aggregation_tree_fanout
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:2408
ExecutorDeviceType device_type
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
#define CHECK(condition)
Definition: Logger.h:222
Executor * executor_
#define VLOG(n)
Definition: Logger.h:316

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createWorkUnit ( const RelAlgNode node,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4162 of file RelAlgExecutor.cpp.

References createAggregateWorkUnit(), createCompoundWorkUnit(), createFilterWorkUnit(), createProjectWorkUnit(), RelRexToStringConfig::defaults(), logger::FATAL, ExecutionOptions::just_explain, LOG, and RelAlgNode::toString().

Referenced by createSortInputWorkUnit(), and getJoinInfo().

4164  {
4165  const auto compound = dynamic_cast<const RelCompound*>(node);
4166  if (compound) {
4167  return createCompoundWorkUnit(compound, sort_info, eo);
4168  }
4169  const auto project = dynamic_cast<const RelProject*>(node);
4170  if (project) {
4171  return createProjectWorkUnit(project, sort_info, eo);
4172  }
4173  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
4174  if (aggregate) {
4175  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
4176  }
4177  const auto filter = dynamic_cast<const RelFilter*>(node);
4178  if (filter) {
4179  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
4180  }
4181  LOG(FATAL) << "Unhandled node type: "
4183  return {};
4184 }
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
#define LOG(tag)
Definition: Logger.h:216
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
virtual std::string toString(RelRexToStringConfig config) const =0
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:49
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::eraseFromTemporaryTables ( const int  table_id)
inlineprivate

Definition at line 397 of file RelAlgExecutor.h.

References temporary_tables_.

397 { temporary_tables_.erase(table_id); }
TemporaryTables temporary_tables_
ExecutionResult RelAlgExecutor::executeAggregate ( const RelAggregate aggregate,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2250 of file RelAlgExecutor.cpp.

References createAggregateWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

2254  {
2255  auto timer = DEBUG_TIMER(__func__);
2256  const auto work_unit = createAggregateWorkUnit(
2257  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2258  return executeWorkUnit(work_unit,
2259  aggregate->getOutputMetainfo(),
2260  true,
2261  co,
2262  eo,
2263  render_info,
2264  queue_time_ms);
2265 }
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define DEBUG_TIMER(name)
Definition: Logger.h:371
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:810

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeCompound ( const RelCompound compound,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2232 of file RelAlgExecutor.cpp.

References createCompoundWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and RelCompound::isAggregate().

Referenced by executeRelAlgStep().

2236  {
2237  auto timer = DEBUG_TIMER(__func__);
2238  const auto work_unit =
2239  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
2240  CompilationOptions co_compound = co;
2241  return executeWorkUnit(work_unit,
2242  compound->getOutputMetainfo(),
2243  compound->isAggregate(),
2244  co_compound,
2245  eo,
2246  render_info,
2247  queue_time_ms);
2248 }
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
bool isAggregate() const
Definition: RelAlgDag.h:1742
#define DEBUG_TIMER(name)
Definition: Logger.h:371
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:810

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeDelete ( const RelAlgNode node,
const CompilationOptions co,
const ExecutionOptions eo_in,
const int64_t  queue_time_ms 
)
private

Definition at line 2130 of file RelAlgExecutor.cpp.

References cat_, CHECK, CHECK_EQ, Executor::clearExternalCaches(), createCompoundWorkUnit(), createProjectWorkUnit(), DEBUG_TIMER, Default, RelRexToStringConfig::defaults(), dml_transaction_parameters_, executor_, CompilationOptions::filter_on_deleted_column, get_table_infos(), get_temporary_table(), Catalog_Namespace::Catalog::getDatabaseId(), Catalog_Namespace::Catalog::getDeletedColumn(), QueryExecutionError::getErrorCode(), getErrorMessageFromCode(), CompilationOptions::makeCpuOnly(), ExecutionOptions::output_columnar_hint, post_execution_callback_, table_is_temporary(), temporary_tables_, and StorageIOFacility::yieldDeleteCallback().

Referenced by executeRelAlgStep().

2133  {
2134  CHECK(node);
2135  auto timer = DEBUG_TIMER(__func__);
2136 
2137  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
2138  auto& work_unit,
2139  const bool is_aggregate) {
2140  auto* table_descriptor = node->getModifiedTableDescriptor();
2141  CHECK(table_descriptor);
2142  if (!table_descriptor->hasDeletedCol) {
2143  throw std::runtime_error(
2144  "DELETE queries are only supported on tables with the vacuum attribute set to "
2145  "'delayed'");
2146  }
2147 
2148  Executor::clearExternalCaches(false, table_descriptor, cat_.getDatabaseId());
2149 
2150  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2151 
2152  auto execute_delete_ra_exe_unit =
2153  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
2154  const bool is_aggregate) {
2156  std::make_unique<DeleteTransactionParameters>(table_descriptor);
2157  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
2159  CHECK(delete_params);
2160  auto delete_callback = yieldDeleteCallback(*delete_params);
2162 
2163  auto eo = eo_in;
2164  if (dml_transaction_parameters_->tableIsTemporary()) {
2165  eo.output_columnar_hint = true;
2166  co_delete.filter_on_deleted_column =
2167  false; // project the entire delete column for columnar update
2168  } else {
2169  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
2170  }
2171 
2172  try {
2173  auto table_update_metadata =
2174  executor_->executeUpdate(exe_unit,
2175  table_infos,
2176  table_descriptor,
2177  co_delete,
2178  eo,
2179  cat_,
2180  executor_->row_set_mem_owner_,
2181  delete_callback,
2182  is_aggregate);
2183  post_execution_callback_ = [table_update_metadata, this]() {
2184  dml_transaction_parameters_->finalizeTransaction(cat_);
2185  TableOptimizer table_optimizer{
2186  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
2187  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2188  };
2189  } catch (const QueryExecutionError& e) {
2190  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2191  }
2192  };
2193 
2194  if (table_is_temporary(table_descriptor)) {
2195  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2196  auto cd = cat_.getDeletedColumn(table_descriptor);
2197  CHECK(cd);
2198  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
2199  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
2200  const auto rewritten_exe_unit =
2201  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
2202  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2203  } else {
2204  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2205  }
2206  };
2207 
2208  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2209  const auto work_unit =
2210  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2211  execute_delete_for_node(compound, work_unit, compound->isAggregate());
2212  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2213  auto work_unit =
2214  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2215  if (project->isSimple()) {
2216  CHECK_EQ(size_t(1), project->inputCount());
2217  const auto input_ra = project->getInput(0);
2218  if (dynamic_cast<const RelSort*>(input_ra)) {
2219  const auto& input_table =
2220  get_temporary_table(&temporary_tables_, -input_ra->getId());
2221  CHECK(input_table);
2222  work_unit.exe_unit.scan_limit = input_table->rowCount();
2223  }
2224  }
2225  execute_delete_for_node(project, work_unit, false);
2226  } else {
2227  throw std::runtime_error("Unsupported parent node for delete: " +
2228  node->toString(RelRexToStringConfig::defaults()));
2229  }
2230 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::optional< std::function< void()> > post_execution_callback_
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:3661
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
TemporaryTables temporary_tables_
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:228
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
int getDatabaseId() const
Definition: Catalog.h:298
bool table_is_temporary(const TableDescriptor *const td)
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:49
Catalog_Namespace::Catalog & cat_
#define CHECK(condition)
Definition: Logger.h:222
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:371
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:391
static std::string getErrorMessageFromCode(const int32_t error_code)
std::unique_ptr< TransactionParameters > dml_transaction_parameters_
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeFilter ( const RelFilter filter,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2650 of file RelAlgExecutor.cpp.

References createFilterWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

2654  {
2655  auto timer = DEBUG_TIMER(__func__);
2656  const auto work_unit =
2657  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2658  return executeWorkUnit(
2659  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
2660 }
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define DEBUG_TIMER(name)
Definition: Logger.h:371
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:810
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeLogicalValues ( const RelLogicalValues logical_values,
const ExecutionOptions eo 
)
private

Definition at line 2706 of file RelAlgExecutor.cpp.

References CPU, DEBUG_TIMER, executor_, RelLogicalValues::getNumRows(), RelLogicalValues::getTupleType(), kBIGINT, kCOUNT, kNULLT, Projection, query_mem_desc, and RelAlgNode::setOutputMetainfo().

Referenced by executeRelAlgStep().

2708  {
2709  auto timer = DEBUG_TIMER(__func__);
2711  logical_values->getNumRows(),
2713  /*is_table_function=*/false);
2714 
2715  auto tuple_type = logical_values->getTupleType();
2716  for (size_t i = 0; i < tuple_type.size(); ++i) {
2717  auto& target_meta_info = tuple_type[i];
2718  if (target_meta_info.get_type_info().is_varlen()) {
2719  throw std::runtime_error("Variable length types not supported in VALUES yet.");
2720  }
2721  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2722  // replace w/ bigint
2723  tuple_type[i] =
2724  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2725  }
2726  query_mem_desc.addColSlotInfo(
2727  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2728  }
2729  logical_values->setOutputMetainfo(tuple_type);
2730 
2731  std::vector<TargetInfo> target_infos;
2732  for (const auto& tuple_type_component : tuple_type) {
2733  target_infos.emplace_back(TargetInfo{false,
2734  kCOUNT,
2735  tuple_type_component.get_type_info(),
2736  SQLTypeInfo(kNULLT, false),
2737  false,
2738  false,
2739  /*is_varlen_projection=*/false});
2740  }
2741 
2742  std::shared_ptr<ResultSet> rs{
2743  ResultSetLogicalValuesBuilder{logical_values,
2744  target_infos,
2747  executor_->getRowSetMemoryOwner(),
2748  executor_}
2749  .build()};
2750 
2751  return {rs, tuple_type};
2752 }
size_t getNumRows() const
Definition: RelAlgDag.h:2273
const std::vector< TargetMetaInfo > getTupleType() const
Definition: RelAlgDag.h:2234
Definition: sqldefs.h:77
#define DEBUG_TIMER(name)
Definition: Logger.h:371
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: RelAlgDag.h:795
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeModify ( const RelModify modify,
const ExecutionOptions eo 
)
private

Definition at line 2802 of file RelAlgExecutor.cpp.

References CPU, DEBUG_TIMER, executor_, and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

2803  {
2804  auto timer = DEBUG_TIMER(__func__);
2805  if (eo.just_explain) {
2806  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2807  }
2808 
2809  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2812  executor_->getRowSetMemoryOwner(),
2813  executor_->getCatalog(),
2814  executor_->blockSize(),
2815  executor_->gridSize());
2816 
2817  std::vector<TargetMetaInfo> empty_targets;
2818  return {rs, empty_targets};
2819 }
std::vector< TargetInfo > TargetInfoList
#define DEBUG_TIMER(name)
Definition: Logger.h:371
Executor * executor_

+ Here is the caller graph for this function:

void RelAlgExecutor::executePostExecutionCallback ( )

Definition at line 4155 of file RelAlgExecutor.cpp.

References post_execution_callback_, and VLOG.

4155  {
4157  VLOG(1) << "Running post execution callback.";
4158  (*post_execution_callback_)();
4159  }
4160 }
std::optional< std::function< void()> > post_execution_callback_
#define VLOG(n)
Definition: Logger.h:316
ExecutionResult RelAlgExecutor::executeProject ( const RelProject project,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const std::optional< size_t >  previous_count 
)
private

Definition at line 2280 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, CPU, createProjectWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), get_temporary_table(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelAlgNode::inputCount(), RelProject::isSimple(), and temporary_tables_.

Referenced by executeRelAlgStep().

2286  {
2287  auto timer = DEBUG_TIMER(__func__);
2288  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
2289  CompilationOptions co_project = co;
2290  if (project->isSimple()) {
2291  CHECK_EQ(size_t(1), project->inputCount());
2292  const auto input_ra = project->getInput(0);
2293  if (dynamic_cast<const RelSort*>(input_ra)) {
2294  co_project.device_type = ExecutorDeviceType::CPU;
2295  const auto& input_table =
2296  get_temporary_table(&temporary_tables_, -input_ra->getId());
2297  CHECK(input_table);
2298  work_unit.exe_unit.scan_limit =
2299  std::min(input_table->getLimit(), input_table->rowCount());
2300  }
2301  }
2302  return executeWorkUnit(work_unit,
2303  project->getOutputMetainfo(),
2304  false,
2305  co_project,
2306  eo,
2307  render_info,
2308  queue_time_ms,
2309  previous_count);
2310 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
TemporaryTables temporary_tables_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:228
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:826
bool isSimple() const
Definition: RelAlgDag.h:1061
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
const size_t inputCount() const
Definition: RelAlgDag.h:824
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:810

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQuery ( const CompilationOptions co,
const ExecutionOptions eo,
const bool  just_explain_plan,
RenderInfo render_info 
)

Definition at line 540 of file RelAlgExecutor.cpp.

References CHECK, DEBUG_TIMER, executeRelAlgQueryNoRetry(), RenderInfo::forceNonInSitu(), g_allow_cpu_retry, logger::INFO, INJECT_TIMER, RelAlgDag::kBuiltOptimized, LOG, CompilationOptions::makeCpuOnly(), post_execution_callback_, query_dag_, run_benchmark::run_query(), and VLOG.

543  {
544  CHECK(query_dag_);
546  << static_cast<int>(query_dag_->getBuildState());
547 
548  auto timer = DEBUG_TIMER(__func__);
550 
551  auto run_query = [&](const CompilationOptions& co_in) {
552  auto execution_result =
553  executeRelAlgQueryNoRetry(co_in, eo, just_explain_plan, render_info);
554 
555  constexpr bool vlog_result_set_summary{false};
556  if constexpr (vlog_result_set_summary) {
557  VLOG(1) << execution_result.getRows()->summaryToString();
558  }
559 
561  VLOG(1) << "Running post execution callback.";
562  (*post_execution_callback_)();
563  }
564  return execution_result;
565  };
566 
567  try {
568  return run_query(co);
569  } catch (const QueryMustRunOnCpu&) {
570  if (!g_allow_cpu_retry) {
571  throw;
572  }
573  }
574  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
575  auto co_cpu = CompilationOptions::makeCpuOnly(co);
576 
577  if (render_info) {
578  render_info->forceNonInSitu();
579  }
580  return run_query(co_cpu);
581 }
std::optional< std::function< void()> > post_execution_callback_
void forceNonInSitu()
Definition: RenderInfo.cpp:45
#define LOG(tag)
Definition: Logger.h:216
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
std::unique_ptr< RelAlgDag > query_dag_
ExecutionResult executeRelAlgQuery(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
bool g_allow_cpu_retry
Definition: Execute.cpp:86
#define VLOG(n)
Definition: Logger.h:316

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQueryNoRetry ( const CompilationOptions co,
const ExecutionOptions eo,
const bool  just_explain_plan,
RenderInfo render_info 
)
private

Definition at line 583 of file RelAlgExecutor.cpp.

References ExecutionOptions::allow_runtime_query_interrupt, cat_, CHECK, cleanupPostExecution(), DEBUG_TIMER, Executor::ERR_INTERRUPTED, executeRelAlgQueryWithFilterPushDown(), executeRelAlgSeq(), executor_, ExecutionOptions::find_push_down_candidates, g_enable_dynamic_watchdog, QueryExecutionError::getErrorCode(), getSubqueries(), INJECT_TIMER, join(), ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, anonymous_namespace{RelAlgExecutor.cpp}::prepare_for_system_table_execution(), anonymous_namespace{RelAlgExecutor.cpp}::prepare_foreign_table_for_execution(), query_dag_, query_state_, run_benchmark_import::result, gpu_enabled::reverse(), setupCaching(), RelRexToStringConfig::skip_input_nodes, gpu_enabled::sort(), timer_start(), timer_stop(), and to_string().

Referenced by executeRelAlgQuery().

586  {
588  auto timer = DEBUG_TIMER(__func__);
589  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
590 
591  query_dag_->resetQueryExecutionState();
592  const auto& ra = query_dag_->getRootNode();
593 
594  // capture the lock acquistion time
595  auto clock_begin = timer_start();
597  executor_->resetInterrupt();
598  }
599  std::string query_session{""};
600  std::string query_str{"N/A"};
601  std::string query_submitted_time{""};
602  // gather necessary query's info
603  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
604  query_session = query_state_->getConstSessionInfo()->get_session_id();
605  query_str = query_state_->getQueryStr();
606  query_submitted_time = query_state_->getQuerySubmittedTime();
607  }
608 
609  auto validate_or_explain_query =
610  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
611  auto interruptable = !render_info && !query_session.empty() &&
612  eo.allow_runtime_query_interrupt && !validate_or_explain_query;
613  if (interruptable) {
614  // if we reach here, the current query which was waiting an idle executor
615  // within the dispatch queue is now scheduled to the specific executor
616  // (not UNITARY_EXECUTOR)
617  // so we update the query session's status with the executor that takes this query
618  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
619  query_session, query_str, query_submitted_time);
620 
621  // now the query is going to be executed, so update the status as
622  // "RUNNING_QUERY_KERNEL"
623  executor_->updateQuerySessionStatus(
624  query_session,
625  query_submitted_time,
626  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
627  }
628 
629  // so it should do cleanup session info after finishing its execution
630  ScopeGuard clearQuerySessionInfo =
631  [this, &query_session, &interruptable, &query_submitted_time] {
632  // reset the runtime query interrupt status after the end of query execution
633  if (interruptable) {
634  // cleanup running session's info
635  executor_->clearQuerySessionStatus(query_session, query_submitted_time);
636  }
637  };
638 
639  auto acquire_execute_mutex = [](Executor * executor) -> auto {
640  auto ret = executor->acquireExecuteMutex();
641  return ret;
642  };
643  // now we acquire executor lock in here to make sure that this executor holds
644  // all necessary resources and at the same time protect them against other executor
645  auto lock = acquire_execute_mutex(executor_);
646 
647  if (interruptable) {
648  // check whether this query session is "already" interrupted
649  // this case occurs when there is very short gap between being interrupted and
650  // taking the execute lock
651  // if so we have to remove "all" queries initiated by this session and we do in here
652  // without running the query
653  try {
654  executor_->checkPendingQueryStatus(query_session);
655  } catch (QueryExecutionError& e) {
657  throw std::runtime_error("Query execution has been interrupted (pending query)");
658  }
659  throw e;
660  } catch (...) {
661  throw std::runtime_error("Checking pending query status failed: unknown error");
662  }
663  }
664  int64_t queue_time_ms = timer_stop(clock_begin);
665 
667 
668  // Notify foreign tables to load prior to caching
670 
671  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
672  setupCaching(&ra);
673 
674  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
675  auto ed_seq = RaExecutionSequence(&ra, executor_);
676 
677  if (just_explain_plan) {
678  std::stringstream ss;
679  std::vector<const RelAlgNode*> nodes;
680  for (size_t i = 0; i < ed_seq.size(); i++) {
681  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
682  }
683  size_t ctr_node_id_in_plan = nodes.size();
684  for (auto& body : boost::adaptors::reverse(nodes)) {
685  // we set each node's id in the query plan in advance before calling toString
686  // method to properly represent the query plan
687  auto node_id_in_plan_tree = ctr_node_id_in_plan--;
688  body->setIdInPlanTree(node_id_in_plan_tree);
689  }
690  size_t ctr = nodes.size();
691  size_t tab_ctr = 0;
692  RelRexToStringConfig config;
693  config.skip_input_nodes = true;
694  for (auto& body : boost::adaptors::reverse(nodes)) {
695  const auto index = ctr--;
696  const auto tabs = std::string(tab_ctr++, '\t');
697  CHECK(body);
698  ss << tabs << std::to_string(index) << " : " << body->toString(config) << "\n";
699  if (auto sort = dynamic_cast<const RelSort*>(body)) {
700  ss << tabs << " : " << sort->getInput(0)->toString(config) << "\n";
701  }
702  if (dynamic_cast<const RelProject*>(body) ||
703  dynamic_cast<const RelCompound*>(body)) {
704  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
705  ss << tabs << " : " << join->toString(config) << "\n";
706  }
707  }
708  }
709  const auto& subqueries = getSubqueries();
710  if (!subqueries.empty()) {
711  ss << "Subqueries: "
712  << "\n";
713  for (const auto& subquery : subqueries) {
714  const auto ra = subquery->getRelAlg();
715  ss << "\t" << ra->toString(config) << "\n";
716  }
717  }
718  auto rs = std::make_shared<ResultSet>(ss.str());
719  return {rs, {}};
720  }
721 
722  if (eo.find_push_down_candidates) {
723  // this extra logic is mainly due to current limitations on multi-step queries
724  // and/or subqueries.
726  ed_seq, co, eo, render_info, queue_time_ms);
727  }
728  timer_setup.stop();
729 
730  // Dispatch the subqueries first
731  for (auto subquery : getSubqueries()) {
732  const auto subquery_ra = subquery->getRelAlg();
733  CHECK(subquery_ra);
734  if (subquery_ra->hasContextData()) {
735  continue;
736  }
737  // Execute the subquery and cache the result.
738  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
739  RaExecutionSequence subquery_seq(subquery_ra, executor_);
740  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
741  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
742  }
743  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
744 }
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
void prepare_for_system_table_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog, const CompilationOptions &co)
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1378
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
void setupCaching(const RelAlgNode *ra)
std::string join(T const &container, std::string const &delim)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:80
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
std::string to_string(char const *&&v)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
A container for relational algebra descriptors defining the execution order for a relational algebra ...
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
std::unique_ptr< RelAlgDag > query_dag_
std::shared_ptr< const query_state::QueryState > query_state_
Catalog_Namespace::Catalog & cat_
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
void cleanupPostExecution()
DEVICE void reverse(ARGS &&...args)
Definition: gpu_enabled.h:96
Executor * executor_
Type timer_start()
Definition: measure.h:42

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

QueryStepExecutionResult RelAlgExecutor::executeRelAlgQuerySingleStep ( const RaExecutionSequence seq,
const size_t  step_idx,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info 
)

Definition at line 796 of file RelAlgExecutor.cpp.

References ExecutionOptions::allow_loop_joins, ExecutionOptions::allow_multifrag, ExecutionOptions::allow_runtime_query_interrupt, CHECK, CHECK_EQ, anonymous_namespace{RelAlgExecutor.cpp}::check_sort_node_source_constraint(), createSortInputWorkUnit(), ExecutionOptions::dynamic_watchdog_time_limit, executeRelAlgSubSeq(), executor_, ExecutionOptions::executor_type, ExecutionOptions::find_push_down_candidates, anonymous_namespace{RelAlgExecutor.cpp}::get_order_entries(), RaExecutionSequence::getDescriptor(), ExecutionOptions::gpu_input_mem_limit_percent, INJECT_TIMER, ExecutionOptions::jit_debug, ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, ExecutionOptions::keep_result, anonymous_namespace{RelAlgExecutor.cpp}::node_is_aggregate(), ExecutionOptions::output_columnar_hint, ExecutionOptions::pending_query_interrupt_freq, post_execution_callback_, queue_time_ms_, Reduce, run_benchmark_import::result, ExecutionOptions::running_query_interrupt_freq, GroupByAndAggregate::shard_count_for_top_groups(), gpu_enabled::sort(), Union, VLOG, ExecutionOptions::with_dynamic_watchdog, and ExecutionOptions::with_watchdog.

801  {
802  INJECT_TIMER(executeRelAlgQueryStep);
803 
804  auto exe_desc_ptr = seq.getDescriptor(step_idx);
805  CHECK(exe_desc_ptr);
806  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
807 
808  size_t shard_count{0};
809  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
810  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
811  };
812 
813  if (sort) {
815  auto order_entries = get_order_entries(sort);
816  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
818  source_work_unit.exe_unit, *executor_->getCatalog());
819  if (!shard_count) {
820  // No point in sorting on the leaf, only execute the input to the sort node.
821  CHECK_EQ(size_t(1), sort->inputCount());
822  const auto source = sort->getInput(0);
823  if (sort->collationCount() || node_is_aggregate(source)) {
824  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
825  CHECK_EQ(temp_seq.size(), size_t(1));
826  ExecutionOptions eo_copy = {
828  eo.keep_result,
829  eo.allow_multifrag,
830  eo.just_explain,
831  eo.allow_loop_joins,
832  eo.with_watchdog,
833  eo.jit_debug,
834  eo.just_validate || sort->isEmptyResult(),
843  eo.executor_type,
844  };
845  // Use subseq to avoid clearing existing temporary tables
846  return {
847  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
848  merge_type(source),
849  source->getId(),
850  false};
851  }
852  }
853  }
856  std::make_pair(step_idx, step_idx + 1),
857  co,
858  eo,
859  render_info,
861  merge_type(exe_desc_ptr->getBody()),
862  exe_desc_ptr->getBody()->getId(),
863  false};
865  VLOG(1) << "Running post execution callback.";
866  (*post_execution_callback_)();
867  }
868  return result;
869 }
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::optional< std::function< void()> > post_execution_callback_
RaExecutionDesc * getDescriptor(size_t idx) const
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
void check_sort_node_source_constraint(const RelSort *sort)
double running_query_interrupt_freq
ExecutorType executor_type
#define INJECT_TIMER(DESC)
Definition: measure.h:93
MergeType
A container for relational algebra descriptors defining the execution order for a relational algebra ...
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
unsigned pending_query_interrupt_freq
bool node_is_aggregate(const RelAlgNode *ra)
#define CHECK(condition)
Definition: Logger.h:222
double gpu_input_mem_limit_percent
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
unsigned dynamic_watchdog_time_limit
Executor * executor_
#define VLOG(n)
Definition: Logger.h:316
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)
WorkUnit createSortInputWorkUnit(const RelSort *, std::list< Analyzer::OrderEntry > &order_entries, const ExecutionOptions &eo)

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQueryWithFilterPushDown ( const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)

Definition at line 146 of file JoinFilterPushDown.cpp.

References ExecutionOptions::allow_loop_joins, ExecutionOptions::allow_multifrag, ExecutionOptions::allow_runtime_query_interrupt, cat_, CHECK, ExecutionOptions::dynamic_watchdog_time_limit, executeRelAlgSeq(), executor_, ExecutionOptions::find_push_down_candidates, getSubqueries(), ExecutionOptions::gpu_input_mem_limit_percent, ExecutionOptions::jit_debug, ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, ExecutionOptions::keep_result, ExecutionOptions::output_columnar_hint, ExecutionOptions::pending_query_interrupt_freq, run_benchmark_import::result, ExecutionOptions::running_query_interrupt_freq, RaExecutionSequence::size(), ExecutionOptions::with_dynamic_watchdog, and ExecutionOptions::with_watchdog.

Referenced by executeRelAlgQueryNoRetry().

151  {
152  // we currently do not fully support filter push down with
153  // multi-step execution and/or with subqueries
154  // TODO(Saman): add proper caching to enable filter push down for all cases
155  const auto& subqueries = getSubqueries();
156  if (seq.size() > 1 || !subqueries.empty()) {
157  if (eo.just_calcite_explain) {
158  return ExecutionResult(std::vector<PushedDownFilterInfo>{},
160  }
161  const ExecutionOptions eo_modified{eo.output_columnar_hint,
162  eo.keep_result,
163  eo.allow_multifrag,
164  eo.just_explain,
165  eo.allow_loop_joins,
166  eo.with_watchdog,
167  eo.jit_debug,
168  eo.just_validate,
171  /*find_push_down_candidates=*/false,
172  /*just_calcite_explain=*/false,
177 
178  // Dispatch the subqueries first
179  for (auto subquery : subqueries) {
180  // Execute the subquery and cache the result.
181  RelAlgExecutor ra_executor(executor_, cat_);
182  const auto subquery_ra = subquery->getRelAlg();
183  CHECK(subquery_ra);
184  RaExecutionSequence subquery_seq(subquery_ra, executor_);
185  auto result =
186  ra_executor.executeRelAlgSeq(subquery_seq, co, eo_modified, nullptr, 0);
187  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
188  }
189  return executeRelAlgSeq(seq, co, eo_modified, render_info, queue_time_ms);
190  }
191  // else
192  return executeRelAlgSeq(seq, co, eo, render_info, queue_time_ms);
193 }
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
double running_query_interrupt_freq
A container for relational algebra descriptors defining the execution order for a relational algebra ...
unsigned pending_query_interrupt_freq
Catalog_Namespace::Catalog & cat_
#define CHECK(condition)
Definition: Logger.h:222
double gpu_input_mem_limit_percent
unsigned dynamic_watchdog_time_limit
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgSeq ( const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const bool  with_existing_temp_tables = false 
)

Definition at line 888 of file RelAlgExecutor.cpp.

References addTemporaryTable(), cat_, CHECK, CHECK_GE, DEBUG_TIMER, CompilationOptions::device_type, RaExecutionSequence::empty(), executeRelAlgStep(), executor_, RenderInfo::forceNonInSitu(), g_allow_query_step_cpu_retry, g_allow_query_step_skipping, g_cluster, g_enable_interop, RaExecutionDesc::getBody(), RaExecutionSequence::getDescriptor(), RaExecutionSequence::getSkippedQueryStepCacheKeys(), GPU, RaExecutionSequence::hasQueryStepForUnion(), logger::INFO, INJECT_TIMER, ExecutionOptions::just_explain, ExecutionOptions::keep_result, left_deep_join_info_, LOG, CompilationOptions::makeCpuOnly(), now_, RaExecutionSequence::size(), gpu_enabled::swap(), target_exprs_owned_, temporary_tables_, and VLOG.

Referenced by executeRelAlgQueryNoRetry(), and executeRelAlgQueryWithFilterPushDown().

893  {
895  auto timer = DEBUG_TIMER(__func__);
896  if (!with_existing_temp_tables) {
898  }
901  executor_->setCatalog(&cat_);
902  executor_->temporary_tables_ = &temporary_tables_;
903 
904  time(&now_);
905  CHECK(!seq.empty());
906 
907  auto get_descriptor_count = [&seq, &eo]() -> size_t {
908  if (eo.just_explain) {
909  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
910  // run the logical values descriptor to generate the result set, then the next
911  // descriptor to generate the explain
912  CHECK_GE(seq.size(), size_t(2));
913  return 2;
914  } else {
915  return 1;
916  }
917  } else {
918  return seq.size();
919  }
920  };
921 
922  const auto exec_desc_count = get_descriptor_count();
923  auto eo_copied = eo;
924  if (seq.hasQueryStepForUnion()) {
925  // we currently do not support resultset recycling when an input query
926  // contains union (all) operation
927  eo_copied.keep_result = false;
928  }
929 
930  // we have to register resultset(s) of the skipped query step(s) as temporary table
931  // before executing the remaining query steps
932  // since they may be required during the query processing
933  // i.e., get metadata of the target expression from the skipped query step
935  for (const auto& kv : seq.getSkippedQueryStepCacheKeys()) {
936  const auto cached_res =
937  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(kv.second);
938  CHECK(cached_res);
939  addTemporaryTable(kv.first, cached_res);
940  }
941  }
942 
943  const auto num_steps = exec_desc_count - 1;
944  for (size_t i = 0; i < exec_desc_count; i++) {
945  VLOG(1) << "Executing query step " << i << " / " << num_steps;
946  try {
948  seq, i, co, eo_copied, (i == num_steps) ? render_info : nullptr, queue_time_ms);
949  } catch (const QueryMustRunOnCpu&) {
950  // Do not allow per-step retry if flag is off or in distributed mode
951  // TODO(todd): Determine if and when we can relax this restriction
952  // for distributed
955  throw;
956  }
957  LOG(INFO) << "Retrying current query step " << i << " / " << num_steps << " on CPU";
958  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
959  if (render_info && i == num_steps) {
960  // only render on the last step
961  render_info->forceNonInSitu();
962  }
963  executeRelAlgStep(seq,
964  i,
965  co_cpu,
966  eo_copied,
967  (i == num_steps) ? render_info : nullptr,
968  queue_time_ms);
969  } catch (const NativeExecutionError&) {
970  if (!g_enable_interop) {
971  throw;
972  }
973  auto eo_extern = eo_copied;
974  eo_extern.executor_type = ::ExecutorType::Extern;
975  auto exec_desc_ptr = seq.getDescriptor(i);
976  const auto body = exec_desc_ptr->getBody();
977  const auto compound = dynamic_cast<const RelCompound*>(body);
978  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
979  LOG(INFO) << "Also failed to run the query using interoperability";
980  throw;
981  }
983  seq, i, co, eo_extern, (i == num_steps) ? render_info : nullptr, queue_time_ms);
984  }
985  }
986 
987  return seq.getDescriptor(num_steps)->getResult();
988 }
RaExecutionDesc * getDescriptor(size_t idx) const
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
void forceNonInSitu()
Definition: RenderInfo.cpp:45
const bool hasQueryStepForUnion() const
#define LOG(tag)
Definition: Logger.h:216
bool g_allow_query_step_skipping
Definition: Execute.cpp:151
const std::unordered_map< int, QueryPlanHash > getSkippedQueryStepCacheKeys() const
TemporaryTables temporary_tables_
#define CHECK_GE(x, y)
Definition: Logger.h:235
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
bool g_enable_interop
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutorDeviceType device_type
std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_join_info_
Catalog_Namespace::Catalog & cat_
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
const RelAlgNode * getBody() const
bool g_allow_query_step_cpu_retry
Definition: Execute.cpp:87
bool g_cluster
Executor * executor_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define VLOG(n)
Definition: Logger.h:316

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeRelAlgStep ( const RaExecutionSequence seq,
const size_t  step_idx,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 1036 of file RelAlgExecutor.cpp.

References addTemporaryTable(), ExecutionOptions::allow_loop_joins, ExecutionOptions::allow_multifrag, ExecutionOptions::allow_runtime_query_interrupt, anonymous_namespace{RelAlgExecutor.cpp}::build_render_targets(), canUseResultsetCache(), CHECK, CPU, DEBUG_TIMER, RelRexToStringConfig::defaults(), CompilationOptions::device_type, ExecutionOptions::dynamic_watchdog_time_limit, executeAggregate(), executeCompound(), executeDelete(), executeFilter(), executeLogicalValues(), executeModify(), executeProject(), executeSort(), executeTableFunction(), executeUnion(), executeUpdate(), executor_, ExecutionOptions::executor_type, logger::FATAL, ExecutionOptions::find_push_down_candidates, g_cluster, g_enable_data_recycler, g_skip_intermediate_count, g_use_query_resultset_cache, RelAlgNode::getContextData(), RaExecutionSequence::getDescriptor(), RaExecutionSequence::getDescriptorByBodyId(), RelAlgNode::getId(), RelAlgNode::getInput(), getParsedQueryHint(), ExecutionOptions::gpu_input_mem_limit_percent, handleNop(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), RelAlgNode::hasContextData(), RaExecutionSequence::hasQueryStepForUnion(), logger::INFO, INJECT_TIMER, ExecutionOptions::jit_debug, ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, kColumnarOutput, kCpuMode, ExecutionOptions::keep_result, kKeepResult, kKeepTableFuncResult, kRowwiseOutput, LOG, ExecutionOptions::outer_fragment_indices, ExecutionOptions::output_columnar_hint, ExecutionOptions::pending_query_interrupt_freq, ExecutionOptions::running_query_interrupt_freq, setHasStepForUnion(), gpu_enabled::sort(), VLOG, ExecutionOptions::with_dynamic_watchdog, and ExecutionOptions::with_watchdog.

Referenced by executeRelAlgSeq(), and executeRelAlgSubSeq().

1041  {
1043  auto timer = DEBUG_TIMER(__func__);
1044  auto exec_desc_ptr = seq.getDescriptor(step_idx);
1045  CHECK(exec_desc_ptr);
1046  auto& exec_desc = *exec_desc_ptr;
1047  const auto body = exec_desc.getBody();
1048  if (body->isNop()) {
1049  handleNop(exec_desc);
1050  return;
1051  }
1052 
1053  const ExecutionOptions eo_work_unit{
1055  eo.keep_result,
1056  eo.allow_multifrag,
1057  eo.just_explain,
1058  eo.allow_loop_joins,
1059  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
1060  eo.jit_debug,
1061  eo.just_validate,
1070  eo.executor_type,
1071  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
1072 
1073  auto handle_hint = [co,
1074  eo_work_unit,
1075  body,
1076  this]() -> std::pair<CompilationOptions, ExecutionOptions> {
1077  ExecutionOptions eo_hint_applied = eo_work_unit;
1078  CompilationOptions co_hint_applied = co;
1079  auto target_node = body;
1080  if (auto sort_body = dynamic_cast<const RelSort*>(body)) {
1081  target_node = sort_body->getInput(0);
1082  }
1083  auto query_hints = getParsedQueryHint(target_node);
1084  auto columnar_output_hint_enabled = false;
1085  auto rowwise_output_hint_enabled = false;
1086  if (query_hints) {
1087  if (query_hints->isHintRegistered(QueryHint::kCpuMode)) {
1088  VLOG(1) << "A user forces to run the query on the CPU execution mode";
1089  co_hint_applied.device_type = ExecutorDeviceType::CPU;
1090  }
1091  if (query_hints->isHintRegistered(QueryHint::kKeepResult)) {
1092  if (!g_enable_data_recycler) {
1093  VLOG(1) << "A user enables keeping query resultset but is skipped since data "
1094  "recycler is disabled";
1095  }
1097  VLOG(1) << "A user enables keeping query resultset but is skipped since query "
1098  "resultset recycler is disabled";
1099  } else {
1100  VLOG(1) << "A user enables keeping query resultset";
1101  eo_hint_applied.keep_result = true;
1102  }
1103  }
1104  if (query_hints->isHintRegistered(QueryHint::kKeepTableFuncResult)) {
1105  // we use this hint within the function 'executeTableFunction`
1106  if (!g_enable_data_recycler) {
1107  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1108  "since data recycler is disabled";
1109  }
1111  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1112  "since query resultset recycler is disabled";
1113  } else {
1114  VLOG(1) << "A user enables keeping table function's resultset";
1115  eo_hint_applied.keep_result = true;
1116  }
1117  }
1118  if (query_hints->isHintRegistered(QueryHint::kColumnarOutput)) {
1119  VLOG(1) << "A user forces the query to run with columnar output";
1120  columnar_output_hint_enabled = true;
1121  } else if (query_hints->isHintRegistered(QueryHint::kRowwiseOutput)) {
1122  VLOG(1) << "A user forces the query to run with rowwise output";
1123  rowwise_output_hint_enabled = true;
1124  }
1125  }
1126  auto columnar_output_enabled = eo_work_unit.output_columnar_hint
1127  ? !rowwise_output_hint_enabled
1128  : columnar_output_hint_enabled;
1129  if (g_cluster && (columnar_output_hint_enabled || rowwise_output_hint_enabled)) {
1130  LOG(INFO) << "Currently, we do not support applying query hint to change query "
1131  "output layout in distributed mode.";
1132  }
1133  eo_hint_applied.output_columnar_hint = columnar_output_enabled;
1134  return std::make_pair(co_hint_applied, eo_hint_applied);
1135  };
1136 
1137  auto hint_applied = handle_hint();
1139 
1140  if (canUseResultsetCache(eo, render_info) && has_valid_query_plan_dag(body)) {
1141  if (auto cached_resultset =
1142  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
1143  body->getQueryPlanDagHash())) {
1144  VLOG(1) << "recycle resultset of the root node " << body->getRelNodeDagId()
1145  << " from resultset cache";
1146  body->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
1147  if (render_info) {
1148  std::vector<std::shared_ptr<Analyzer::Expr>>& cached_target_exprs =
1149  executor_->getRecultSetRecyclerHolder().getTargetExprs(
1150  body->getQueryPlanDagHash());
1151  std::vector<Analyzer::Expr*> copied_target_exprs;
1152  for (const auto& expr : cached_target_exprs) {
1153  copied_target_exprs.push_back(expr.get());
1154  }
1156  *render_info, copied_target_exprs, cached_resultset->getTargetMetaInfo());
1157  }
1158  exec_desc.setResult({cached_resultset, cached_resultset->getTargetMetaInfo()});
1159  addTemporaryTable(-body->getId(), exec_desc.getResult().getDataPtr());
1160  return;
1161  }
1162  }
1163 
1164  const auto compound = dynamic_cast<const RelCompound*>(body);
1165  if (compound) {
1166  if (compound->isDeleteViaSelect()) {
1167  executeDelete(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1168  } else if (compound->isUpdateViaSelect()) {
1169  executeUpdate(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1170  } else {
1171  exec_desc.setResult(executeCompound(
1172  compound, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1173  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
1174  << static_cast<int>(-compound->getId()) << ", ...)"
1175  << " exec_desc.getResult().getDataPtr()->rowCount()="
1176  << exec_desc.getResult().getDataPtr()->rowCount();
1177  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1178  return;
1179  }
1180  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
1181  }
1182  return;
1183  }
1184  const auto project = dynamic_cast<const RelProject*>(body);
1185  if (project) {
1186  if (project->isDeleteViaSelect()) {
1187  executeDelete(project, hint_applied.first, hint_applied.second, queue_time_ms);
1188  } else if (project->isUpdateViaSelect()) {
1189  executeUpdate(project, hint_applied.first, hint_applied.second, queue_time_ms);
1190  } else {
1191  std::optional<size_t> prev_count;
1192  // Disabling the intermediate count optimization in distributed, as the previous
1193  // execution descriptor will likely not hold the aggregated result.
1194  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
1195  // If the previous node produced a reliable count, skip the pre-flight count.
1196  RelAlgNode const* const prev_body = project->getInput(0);
1197  if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
1198  if (RaExecutionDesc const* const prev_exec_desc =
1199  prev_body->hasContextData()
1200  ? prev_body->getContextData()
1201  : seq.getDescriptorByBodyId(prev_body->getId(), step_idx - 1)) {
1202  const auto& prev_exe_result = prev_exec_desc->getResult();
1203  const auto prev_result = prev_exe_result.getRows();
1204  if (prev_result) {
1205  prev_count = prev_result->rowCount();
1206  VLOG(3) << "Setting output row count for projection node to previous node ("
1207  << prev_exec_desc->getBody()->toString(
1209  << ") to " << *prev_count;
1210  }
1211  }
1212  }
1213  }
1214  exec_desc.setResult(executeProject(project,
1215  hint_applied.first,
1216  hint_applied.second,
1217  render_info,
1218  queue_time_ms,
1219  prev_count));
1220  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
1221  << static_cast<int>(-project->getId()) << ", ...)"
1222  << " exec_desc.getResult().getDataPtr()->rowCount()="
1223  << exec_desc.getResult().getDataPtr()->rowCount();
1224  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1225  return;
1226  }
1227  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
1228  }
1229  return;
1230  }
1231  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
1232  if (aggregate) {
1233  exec_desc.setResult(executeAggregate(
1234  aggregate, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1235  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
1236  return;
1237  }
1238  const auto filter = dynamic_cast<const RelFilter*>(body);
1239  if (filter) {
1240  exec_desc.setResult(executeFilter(
1241  filter, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1242  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
1243  return;
1244  }
1245  const auto sort = dynamic_cast<const RelSort*>(body);
1246  if (sort) {
1247  exec_desc.setResult(executeSort(
1248  sort, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1249  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1250  return;
1251  }
1252  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
1253  return;
1254  }
1255  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
1256  if (logical_values) {
1257  exec_desc.setResult(executeLogicalValues(logical_values, hint_applied.second));
1258  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
1259  return;
1260  }
1261  const auto modify = dynamic_cast<const RelModify*>(body);
1262  if (modify) {
1263  exec_desc.setResult(executeModify(modify, hint_applied.second));
1264  return;
1265  }
1266  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
1267  if (logical_union) {
1268  exec_desc.setResult(executeUnion(logical_union,
1269  seq,
1270  hint_applied.first,
1271  hint_applied.second,
1272  render_info,
1273  queue_time_ms));
1274  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
1275  return;
1276  }
1277  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
1278  if (table_func) {
1279  exec_desc.setResult(executeTableFunction(
1280  table_func, hint_applied.first, hint_applied.second, queue_time_ms));
1281  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
1282  return;
1283  }
1284  LOG(FATAL) << "Unhandled body type: "
1285  << body->toString(RelRexToStringConfig::defaults());
1286 }
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
bool g_use_query_resultset_cache
Definition: Execute.cpp:148
bool has_valid_query_plan_dag(const RelAlgNode *node)
const bool hasQueryStepForUnion() const
bool g_skip_intermediate_count
#define LOG(tag)
Definition: Logger.h:216
std::vector< size_t > outer_fragment_indices
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
std::optional< RegisteredQueryHint > getParsedQueryHint(const RelAlgNode *node)
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool g_enable_data_recycler
Definition: Execute.cpp:146
double running_query_interrupt_freq
void handleNop(RaExecutionDesc &ed)
unsigned getId() const
Definition: RelAlgDag.h:814
ExecutorType executor_type
#define INJECT_TIMER(DESC)
Definition: measure.h:93
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:826
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
unsigned pending_query_interrupt_freq
ExecutorDeviceType device_type
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
const RaExecutionDesc * getContextData() const
Definition: RelAlgDag.h:822
ExecutionResult executeUnion(const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:49
bool canUseResultsetCache(const ExecutionOptions &eo, RenderInfo *render_info) const
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
double gpu_input_mem_limit_percent
bool g_cluster
unsigned dynamic_watchdog_time_limit
void executeDelete(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptorByBodyId(unsigned const body_id, size_t const start_idx) const
Executor * executor_
bool hasContextData() const
Definition: RelAlgDag.h:820
#define VLOG(n)
Definition: Logger.h:316
void setHasStepForUnion(bool flag)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgSubSeq ( const RaExecutionSequence seq,
const std::pair< size_t, size_t >  interval,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)

Definition at line 990 of file RelAlgExecutor.cpp.

References cat_, CHECK, CompilationOptions::device_type, executeRelAlgStep(), executor_, RenderInfo::forceNonInSitu(), g_allow_query_step_cpu_retry, g_cluster, RaExecutionSequence::getDescriptor(), GPU, logger::INFO, INJECT_TIMER, left_deep_join_info_, LOG, CompilationOptions::makeCpuOnly(), now_, gpu_enabled::swap(), and temporary_tables_.

Referenced by executeRelAlgQuerySingleStep().

996  {
998  executor_->setCatalog(&cat_);
999  executor_->temporary_tables_ = &temporary_tables_;
1001  time(&now_);
1002  for (size_t i = interval.first; i < interval.second; i++) {
1003  // only render on the last step
1004  try {
1005  executeRelAlgStep(seq,
1006  i,
1007  co,
1008  eo,
1009  (i == interval.second - 1) ? render_info : nullptr,
1010  queue_time_ms);
1011  } catch (const QueryMustRunOnCpu&) {
1012  // Do not allow per-step retry if flag is off or in distributed mode
1013  // TODO(todd): Determine if and when we can relax this restriction
1014  // for distributed
1017  throw;
1018  }
1019  LOG(INFO) << "Retrying current query step " << i << " on CPU";
1020  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
1021  if (render_info && i == interval.second - 1) {
1022  render_info->forceNonInSitu();
1023  }
1024  executeRelAlgStep(seq,
1025  i,
1026  co_cpu,
1027  eo,
1028  (i == interval.second - 1) ? render_info : nullptr,
1029  queue_time_ms);
1030  }
1031  }
1032 
1033  return seq.getDescriptor(interval.second - 1)->getResult();
1034 }
RaExecutionDesc * getDescriptor(size_t idx) const
void forceNonInSitu()
Definition: RenderInfo.cpp:45
#define LOG(tag)
Definition: Logger.h:216
TemporaryTables temporary_tables_
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutorDeviceType device_type
std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_join_info_
Catalog_Namespace::Catalog & cat_
#define CHECK(condition)
Definition: Logger.h:222
bool g_allow_query_step_cpu_retry
Definition: Execute.cpp:87
bool g_cluster
Executor * executor_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeSimpleInsert ( const Analyzer::Query insert_query,
Fragmenter_Namespace::InsertDataLoader inserter,
const Catalog_Namespace::SessionInfo session 
)

Definition at line 2821 of file RelAlgExecutor.cpp.

References append_datum(), DataBlockPtr::arraysPtr, cat_, CHECK, CHECK_EQ, checked_malloc(), Fragmenter_Namespace::InsertData::columnIds, CPU, Fragmenter_Namespace::InsertData::data, Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, executor_, import_export::fill_missing_columns(), get_column_descriptor(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_elem_type(), Analyzer::Query::get_result_col_list(), Analyzer::Query::get_result_table_id(), SQLTypeInfo::get_size(), SQLTypeInfo::get_type(), Analyzer::Query::get_values_lists(), Catalog_Namespace::Catalog::getCurrentDB(), Fragmenter_Namespace::InsertDataLoader::getLeafCount(), Catalog_Namespace::Catalog::getMetadataForDict(), Catalog_Namespace::Catalog::getMetadataForTable(), inline_fixed_encoding_null_val(), anonymous_namespace{RelAlgExecutor.cpp}::insert_one_dict_str(), Fragmenter_Namespace::InsertDataLoader::insertData(), CacheInvalidator< CACHE_HOLDING_TYPES >::invalidateCachesByTable(), is_null(), SQLTypeInfo::is_string(), kARRAY, kBIGINT, kBOOLEAN, kCAST, kCHAR, kDATE, kDECIMAL, kDOUBLE, kENCODING_DICT, kENCODING_NONE, kFLOAT, kINT, kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kNUMERIC, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, DataBlockPtr::numbersPtr, Fragmenter_Namespace::InsertData::numRows, anonymous_namespace{TypedDataAccessors.h}::put_null(), anonymous_namespace{TypedDataAccessors.h}::put_null_array(), DataBlockPtr::stringsPtr, Fragmenter_Namespace::InsertData::tableId, and to_string().

2824  {
2825  // Note: We currently obtain an executor for this method, but we do not need it.
2826  // Therefore, we skip the executor state setup in the regular execution path. In the
2827  // future, we will likely want to use the executor to evaluate expressions in the insert
2828  // statement.
2829 
2830  const auto& values_lists = query.get_values_lists();
2831  const int table_id = query.get_result_table_id();
2832  const auto& col_id_list = query.get_result_col_list();
2833  size_t rows_number = values_lists.size();
2834  size_t leaf_count = inserter.getLeafCount();
2835  const auto td = cat_.getMetadataForTable(table_id);
2836  CHECK(td);
2837  size_t rows_per_leaf = rows_number;
2838  if (td->nShards == 0) {
2839  rows_per_leaf =
2840  ceil(static_cast<double>(rows_number) / static_cast<double>(leaf_count));
2841  }
2842  auto max_number_of_rows_per_package =
2843  std::max(size_t(1), std::min(rows_per_leaf, size_t(64 * 1024)));
2844 
2845  std::vector<const ColumnDescriptor*> col_descriptors;
2846  std::vector<int> col_ids;
2847  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2848  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2849  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2850  std::unordered_map<int, int> sequential_ids;
2851 
2852  for (const int col_id : col_id_list) {
2853  const auto cd = get_column_descriptor(col_id, table_id, cat_);
2854  const auto col_enc = cd->columnType.get_compression();
2855  if (cd->columnType.is_string()) {
2856  switch (col_enc) {
2857  case kENCODING_NONE: {
2858  auto it_ok =
2859  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2860  CHECK(it_ok.second);
2861  break;
2862  }
2863  case kENCODING_DICT: {
2864  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
2865  CHECK(dd);
2866  const auto it_ok = col_buffers.emplace(
2867  col_id,
2868  std::make_unique<uint8_t[]>(cd->columnType.get_size() *
2869  max_number_of_rows_per_package));
2870  CHECK(it_ok.second);
2871  break;
2872  }
2873  default:
2874  CHECK(false);
2875  }
2876  } else if (cd->columnType.is_geometry()) {
2877  auto it_ok =
2878  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2879  CHECK(it_ok.second);
2880  } else if (cd->columnType.is_array()) {
2881  auto it_ok =
2882  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2883  CHECK(it_ok.second);
2884  } else {
2885  const auto it_ok = col_buffers.emplace(
2886  col_id,
2887  std::unique_ptr<uint8_t[]>(new uint8_t[cd->columnType.get_logical_size() *
2888  max_number_of_rows_per_package]()));
2889  CHECK(it_ok.second);
2890  }
2891  col_descriptors.push_back(cd);
2892  sequential_ids[col_id] = col_ids.size();
2893  col_ids.push_back(col_id);
2894  }
2895 
2896  // mark the target table's cached item as dirty
2897  std::vector<int> table_chunk_key_prefix{cat_.getCurrentDB().dbId, table_id};
2898  auto table_key = boost::hash_value(table_chunk_key_prefix);
2901 
2902  size_t start_row = 0;
2903  size_t rows_left = rows_number;
2904  while (rows_left != 0) {
2905  // clear the buffers
2906  for (const auto& kv : col_buffers) {
2907  memset(kv.second.get(), 0, max_number_of_rows_per_package);
2908  }
2909  for (auto& kv : str_col_buffers) {
2910  kv.second.clear();
2911  }
2912  for (auto& kv : arr_col_buffers) {
2913  kv.second.clear();
2914  }
2915 
2916  auto package_size = std::min(rows_left, max_number_of_rows_per_package);
2917  // Note: if there will be use cases with batch inserts with lots of rows, it might be
2918  // more efficient to do the loops below column by column instead of row by row.
2919  // But for now I consider such a refactoring not worth investigating, as we have more
2920  // efficient ways to insert many rows anyway.
2921  for (size_t row_idx = 0; row_idx < package_size; ++row_idx) {
2922  const auto& values_list = values_lists[row_idx + start_row];
2923  for (size_t col_idx = 0; col_idx < col_descriptors.size(); ++col_idx) {
2924  CHECK(values_list.size() == col_descriptors.size());
2925  auto col_cv =
2926  dynamic_cast<const Analyzer::Constant*>(values_list[col_idx]->get_expr());
2927  if (!col_cv) {
2928  auto col_cast =
2929  dynamic_cast<const Analyzer::UOper*>(values_list[col_idx]->get_expr());
2930  CHECK(col_cast);
2931  CHECK_EQ(kCAST, col_cast->get_optype());
2932  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2933  }
2934  CHECK(col_cv);
2935  const auto cd = col_descriptors[col_idx];
2936  auto col_datum = col_cv->get_constval();
2937  auto col_type = cd->columnType.get_type();
2938  uint8_t* col_data_bytes{nullptr};
2939  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2940  (!cd->columnType.is_string() ||
2941  cd->columnType.get_compression() == kENCODING_DICT)) {
2942  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2943  CHECK(col_data_bytes_it != col_buffers.end());
2944  col_data_bytes = col_data_bytes_it->second.get();
2945  }
2946  switch (col_type) {
2947  case kBOOLEAN: {
2948  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2949  auto null_bool_val =
2950  col_datum.boolval == inline_fixed_encoding_null_val(cd->columnType);
2951  col_data[row_idx] = col_cv->get_is_null() || null_bool_val
2952  ? inline_fixed_encoding_null_val(cd->columnType)
2953  : (col_datum.boolval ? 1 : 0);
2954  break;
2955  }
2956  case kTINYINT: {
2957  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2958  col_data[row_idx] = col_cv->get_is_null()
2959  ? inline_fixed_encoding_null_val(cd->columnType)
2960  : col_datum.tinyintval;
2961  break;
2962  }
2963  case kSMALLINT: {
2964  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2965  col_data[row_idx] = col_cv->get_is_null()
2966  ? inline_fixed_encoding_null_val(cd->columnType)
2967  : col_datum.smallintval;
2968  break;
2969  }
2970  case kINT: {
2971  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2972  col_data[row_idx] = col_cv->get_is_null()
2973  ? inline_fixed_encoding_null_val(cd->columnType)
2974  : col_datum.intval;
2975  break;
2976  }
2977  case kBIGINT:
2978  case kDECIMAL:
2979  case kNUMERIC: {
2980  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2981  col_data[row_idx] = col_cv->get_is_null()
2982  ? inline_fixed_encoding_null_val(cd->columnType)
2983  : col_datum.bigintval;
2984  break;
2985  }
2986  case kFLOAT: {
2987  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2988  col_data[row_idx] = col_datum.floatval;
2989  break;
2990  }
2991  case kDOUBLE: {
2992  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2993  col_data[row_idx] = col_datum.doubleval;
2994  break;
2995  }
2996  case kTEXT:
2997  case kVARCHAR:
2998  case kCHAR: {
2999  switch (cd->columnType.get_compression()) {
3000  case kENCODING_NONE:
3001  str_col_buffers[col_ids[col_idx]].push_back(
3002  col_datum.stringval ? *col_datum.stringval : "");
3003  break;
3004  case kENCODING_DICT: {
3005  switch (cd->columnType.get_size()) {
3006  case 1:
3008  &reinterpret_cast<uint8_t*>(col_data_bytes)[row_idx],
3009  cd,
3010  col_cv,
3011  cat_);
3012  break;
3013  case 2:
3015  &reinterpret_cast<uint16_t*>(col_data_bytes)[row_idx],
3016  cd,
3017  col_cv,
3018  cat_);
3019  break;
3020  case 4:
3022  &reinterpret_cast<int32_t*>(col_data_bytes)[row_idx],
3023  cd,
3024  col_cv,
3025  cat_);
3026  break;
3027  default:
3028  CHECK(false);
3029  }
3030  break;
3031  }
3032  default:
3033  CHECK(false);
3034  }
3035  break;
3036  }
3037  case kTIME:
3038  case kTIMESTAMP:
3039  case kDATE: {
3040  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
3041  col_data[row_idx] = col_cv->get_is_null()
3042  ? inline_fixed_encoding_null_val(cd->columnType)
3043  : col_datum.bigintval;
3044  break;
3045  }
3046  case kARRAY: {
3047  const auto is_null = col_cv->get_is_null();
3048  const auto size = cd->columnType.get_size();
3049  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
3050  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
3051  const auto is_point_coords =
3052  (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
3053  if (is_null && !is_point_coords) {
3054  if (size > 0) {
3055  int8_t* buf = (int8_t*)checked_malloc(size);
3056  put_null_array(static_cast<void*>(buf), elem_ti, "");
3057  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
3058  p += elem_ti.get_size()) {
3059  put_null(static_cast<void*>(p), elem_ti, "");
3060  }
3061  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
3062  } else {
3063  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
3064  }
3065  break;
3066  }
3067  const auto l = col_cv->get_value_list();
3068  size_t len = l.size() * elem_ti.get_size();
3069  if (size > 0 && static_cast<size_t>(size) != len) {
3070  throw std::runtime_error("Array column " + cd->columnName + " expects " +
3071  std::to_string(size / elem_ti.get_size()) +
3072  " values, " + "received " +
3073  std::to_string(l.size()));
3074  }
3075  if (elem_ti.is_string()) {
3076  CHECK(kENCODING_DICT == elem_ti.get_compression());
3077  CHECK(4 == elem_ti.get_size());
3078 
3079  int8_t* buf = (int8_t*)checked_malloc(len);
3080  int32_t* p = reinterpret_cast<int32_t*>(buf);
3081 
3082  int elemIndex = 0;
3083  for (auto& e : l) {
3084  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3085  CHECK(c);
3087  &p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
3088  elemIndex++;
3089  }
3090  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3091 
3092  } else {
3093  int8_t* buf = (int8_t*)checked_malloc(len);
3094  int8_t* p = buf;
3095  for (auto& e : l) {
3096  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3097  CHECK(c);
3098  p = append_datum(p, c->get_constval(), elem_ti);
3099  CHECK(p);
3100  }
3101  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3102  }
3103  break;
3104  }
3105  case kPOINT:
3106  case kMULTIPOINT:
3107  case kLINESTRING:
3108  case kMULTILINESTRING:
3109  case kPOLYGON:
3110  case kMULTIPOLYGON:
3111  str_col_buffers[col_ids[col_idx]].push_back(
3112  col_datum.stringval ? *col_datum.stringval : "");
3113  break;
3114  default:
3115  CHECK(false);
3116  }
3117  }
3118  }
3119  start_row += package_size;
3120  rows_left -= package_size;
3121 
3123  insert_data.databaseId = cat_.getCurrentDB().dbId;
3124  insert_data.tableId = table_id;
3125  insert_data.data.resize(col_ids.size());
3126  insert_data.columnIds = col_ids;
3127  for (const auto& kv : col_buffers) {
3128  DataBlockPtr p;
3129  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
3130  insert_data.data[sequential_ids[kv.first]] = p;
3131  }
3132  for (auto& kv : str_col_buffers) {
3133  DataBlockPtr p;
3134  p.stringsPtr = &kv.second;
3135  insert_data.data[sequential_ids[kv.first]] = p;
3136  }
3137  for (auto& kv : arr_col_buffers) {
3138  DataBlockPtr p;
3139  p.arraysPtr = &kv.second;
3140  insert_data.data[sequential_ids[kv.first]] = p;
3141  }
3142  insert_data.numRows = package_size;
3143  auto data_memory_holder = import_export::fill_missing_columns(&cat_, insert_data);
3144  inserter.insertData(session, insert_data);
3145  }
3146 
3147  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
3150  executor_->getRowSetMemoryOwner(),
3151  nullptr,
3152  0,
3153  0);
3154  std::vector<TargetMetaInfo> empty_targets;
3155  return {rs, empty_targets};
3156 }
static void invalidateCachesByTable(size_t table_key)
#define CHECK_EQ(x, y)
Definition: Logger.h:230
HOST DEVICE int get_size() const
Definition: sqltypes.h:414
int8_t * append_datum(int8_t *buf, const Datum &d, const SQLTypeInfo &ti)
Definition: Datum.cpp:518
Definition: sqltypes.h:63
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:247
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:248
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:6259
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
Definition: sqldefs.h:48
std::vector< TargetInfo > TargetInfoList
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:404
void insertData(const Catalog_Namespace::SessionInfo &session_info, InsertData &insert_data)
std::string to_string(char const *&&v)
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:228
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
CONSTEXPR DEVICE bool is_null(const T &value)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:242
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1960
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
Definition: sqltypes.h:66
Definition: sqltypes.h:67
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:412
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:73
Catalog_Namespace::Catalog & cat_
Definition: sqltypes.h:55
#define CHECK(condition)
Definition: Logger.h:222
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
Definition: sqltypes.h:59
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_string() const
Definition: sqltypes.h:600
int8_t * numbersPtr
Definition: sqltypes.h:246
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:981
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
Executor * executor_
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:191

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeSort ( const RelSort sort,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 3175 of file RelAlgExecutor.cpp.

References SpeculativeTopNBlacklist::add(), addTemporaryTable(), anonymous_namespace{RelAlgExecutor.cpp}::build_render_targets(), canUseResultsetCache(), CHECK, CHECK_EQ, anonymous_namespace{RelAlgExecutor.cpp}::check_sort_node_source_constraint(), RelSort::collationCount(), createSortInputWorkUnit(), DEBUG_TIMER, executeWorkUnit(), executor_, anonymous_namespace{RelAlgExecutor.cpp}::first_oe_is_desc(), g_cluster, anonymous_namespace{RelAlgExecutor.cpp}::get_order_entries(), ExecutionResult::getDataPtr(), RelAlgNode::getId(), RelAlgNode::getInput(), RelSort::getLimit(), RelSort::getOffset(), GPU, anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), RelSort::isEmptyResult(), leaf_results_, anonymous_namespace{RelAlgExecutor.cpp}::node_is_aggregate(), ExecutionOptions::output_columnar_hint, run_benchmark_import::result, RelAlgNode::setOutputMetainfo(), gpu_enabled::sort(), speculative_topn_blacklist_, temporary_tables_, use_speculative_top_n(), and VLOG.

Referenced by executeRelAlgStep().

3179  {
3180  auto timer = DEBUG_TIMER(__func__);
3182  const auto source = sort->getInput(0);
3183  const bool is_aggregate = node_is_aggregate(source);
3184  auto it = leaf_results_.find(sort->getId());
3185  auto order_entries = get_order_entries(sort);
3186  if (it != leaf_results_.end()) {
3187  // Add any transient string literals to the sdp on the agg
3188  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3189  executor_->addTransientStringLiterals(source_work_unit.exe_unit,
3190  executor_->row_set_mem_owner_);
3191  // Handle push-down for LIMIT for multi-node
3192  auto& aggregated_result = it->second;
3193  auto& result_rows = aggregated_result.rs;
3194  const size_t limit = sort->getLimit();
3195  const size_t offset = sort->getOffset();
3196  if (limit || offset) {
3197  if (!order_entries.empty()) {
3198  result_rows->sort(order_entries, limit + offset, executor_);
3199  }
3200  result_rows->dropFirstN(offset);
3201  if (limit) {
3202  result_rows->keepFirstN(limit);
3203  }
3204  }
3205 
3206  if (render_info) {
3207  // We've hit a sort step that is the very last step
3208  // in a distributed render query. We'll fill in the render targets
3209  // since we have all that data needed to do so. This is normally
3210  // done in executeWorkUnit, but that is bypassed in this case.
3211  build_render_targets(*render_info,
3212  source_work_unit.exe_unit.target_exprs,
3213  aggregated_result.targets_meta);
3214  }
3215 
3216  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3217  sort->setOutputMetainfo(aggregated_result.targets_meta);
3218 
3219  return result;
3220  }
3221 
3222  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
3223  bool is_desc{false};
3224  bool use_speculative_top_n_sort{false};
3225 
3226  auto execute_sort_query = [this,
3227  sort,
3228  &source,
3229  &is_aggregate,
3230  &eo,
3231  &co,
3232  render_info,
3233  queue_time_ms,
3234  &groupby_exprs,
3235  &is_desc,
3236  &order_entries,
3237  &use_speculative_top_n_sort]() -> ExecutionResult {
3238  const size_t limit = sort->getLimit();
3239  const size_t offset = sort->getOffset();
3240  // check whether sort's input is cached
3241  auto source_node = sort->getInput(0);
3242  CHECK(source_node);
3243  ExecutionResult source_result{nullptr, {}};
3244  auto source_query_plan_dag = source_node->getQueryPlanDagHash();
3245  bool enable_resultset_recycler = canUseResultsetCache(eo, render_info);
3246  if (enable_resultset_recycler && has_valid_query_plan_dag(source_node) &&
3247  !sort->isEmptyResult()) {
3248  if (auto cached_resultset =
3249  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
3250  source_query_plan_dag)) {
3251  CHECK(cached_resultset->canUseSpeculativeTopNSort());
3252  VLOG(1) << "recycle resultset of the root node " << source_node->getRelNodeDagId()
3253  << " from resultset cache";
3254  source_result =
3255  ExecutionResult{cached_resultset, cached_resultset->getTargetMetaInfo()};
3256  if (temporary_tables_.find(-source_node->getId()) == temporary_tables_.end()) {
3257  addTemporaryTable(-source_node->getId(), cached_resultset);
3258  }
3259  use_speculative_top_n_sort = *cached_resultset->canUseSpeculativeTopNSort() &&
3260  co.device_type == ExecutorDeviceType::GPU;
3261  source_node->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
3262  sort->setOutputMetainfo(source_node->getOutputMetainfo());
3263  }
3264  }
3265  if (!source_result.getDataPtr()) {
3266  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3267  is_desc = first_oe_is_desc(order_entries);
3268  ExecutionOptions eo_copy = {
3270  eo.keep_result,
3271  eo.allow_multifrag,
3272  eo.just_explain,
3273  eo.allow_loop_joins,
3274  eo.with_watchdog,
3275  eo.jit_debug,
3276  eo.just_validate || sort->isEmptyResult(),
3277  eo.with_dynamic_watchdog,
3278  eo.dynamic_watchdog_time_limit,
3279  eo.find_push_down_candidates,
3280  eo.just_calcite_explain,
3281  eo.gpu_input_mem_limit_percent,
3282  eo.allow_runtime_query_interrupt,
3283  eo.running_query_interrupt_freq,
3284  eo.pending_query_interrupt_freq,
3285  eo.executor_type,
3286  };
3287 
3288  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
3289  source_result = executeWorkUnit(source_work_unit,
3290  source->getOutputMetainfo(),
3291  is_aggregate,
3292  co,
3293  eo_copy,
3294  render_info,
3295  queue_time_ms);
3296  use_speculative_top_n_sort =
3297  source_result.getDataPtr() && source_result.getRows()->hasValidBuffer() &&
3298  use_speculative_top_n(source_work_unit.exe_unit,
3299  source_result.getRows()->getQueryMemDesc());
3300  }
3301  if (render_info && render_info->isInSitu()) {
3302  return source_result;
3303  }
3304  if (source_result.isFilterPushDownEnabled()) {
3305  return source_result;
3306  }
3307  auto rows_to_sort = source_result.getRows();
3308  if (eo.just_explain) {
3309  return {rows_to_sort, {}};
3310  }
3311  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
3312  !use_speculative_top_n_sort) {
3313  const size_t top_n = limit == 0 ? 0 : limit + offset;
3314  rows_to_sort->sort(order_entries, top_n, executor_);
3315  }
3316  if (limit || offset) {
3317  if (g_cluster && sort->collationCount() == 0) {
3318  if (offset >= rows_to_sort->rowCount()) {
3319  rows_to_sort->dropFirstN(offset);
3320  } else {
3321  rows_to_sort->keepFirstN(limit + offset);
3322  }
3323  } else {
3324  rows_to_sort->dropFirstN(offset);
3325  if (limit) {
3326  rows_to_sort->keepFirstN(limit);
3327  }
3328  }
3329  }
3330  return {rows_to_sort, source_result.getTargetsMeta()};
3331  };
3332 
3333  try {
3334  return execute_sort_query();
3335  } catch (const SpeculativeTopNFailed& e) {
3336  CHECK_EQ(size_t(1), groupby_exprs.size());
3337  CHECK(groupby_exprs.front());
3338  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
3339  return execute_sort_query();
3340  }
3341 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
size_t getOffset() const
Definition: RelAlgDag.h:1835
bool has_valid_query_plan_dag(const RelAlgNode *node)
static SpeculativeTopNBlacklist speculative_topn_blacklist_
TemporaryTables temporary_tables_
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
const ResultSetPtr & getDataPtr() const
unsigned getId() const
Definition: RelAlgDag.h:814
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:826
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
bool node_is_aggregate(const RelAlgNode *ra)
bool isEmptyResult() const
Definition: RelAlgDag.h:1829
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
size_t collationCount() const
Definition: RelAlgDag.h:1816
size_t getLimit() const
Definition: RelAlgDag.h:1833
bool canUseResultsetCache(const ExecutionOptions &eo, RenderInfo *render_info) const
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: RelAlgDag.h:795
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
bool g_cluster
void add(const std::shared_ptr< Analyzer::Expr > expr, const bool desc)
Executor * executor_
#define VLOG(n)
Definition: Logger.h:316
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)
WorkUnit createSortInputWorkUnit(const RelSort *, std::list< Analyzer::OrderEntry > &order_entries, const ExecutionOptions &eo)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeTableFunction ( const RelTableFunction table_func,
const CompilationOptions co_in,
const ExecutionOptions eo,
const int64_t  queue_time_ms 
)
private

Definition at line 2312 of file RelAlgExecutor.cpp.

References addTemporaryTable(), canUseResultsetCache(), cat_, CHECK, createTableFunctionWorkUnit(), DEBUG_TIMER, CompilationOptions::device_type, Executor::ERR_OUT_OF_GPU_MEM, executor_, g_allow_auto_resultset_caching, g_auto_resultset_caching_threshold, g_cluster, g_enable_table_functions, get_table_infos(), QueryExecutionError::getErrorCode(), getGlobalQueryHint(), RelAlgNode::getQueryPlanDagHash(), RelAlgNode::getRelNodeDagId(), ScanNodeTableKeyCollector::getScanNodeTableKey(), GPU, handlePersistentError(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), hasStepForUnion(), INJECT_TIMER, anonymous_namespace{RelAlgExecutor.cpp}::is_validate_or_explain_query(), ExecutionOptions::just_explain, ExecutionOptions::keep_result, kKeepTableFuncResult, run_benchmark_import::result, target_exprs_owned_, timer_start(), timer_stop(), and VLOG.

Referenced by executeRelAlgStep().

2315  {
2317  auto timer = DEBUG_TIMER(__func__);
2318 
2319  auto co = co_in;
2320 
2321  if (g_cluster) {
2322  throw std::runtime_error("Table functions not supported in distributed mode yet");
2323  }
2324  if (!g_enable_table_functions) {
2325  throw std::runtime_error("Table function support is disabled");
2326  }
2327  auto table_func_work_unit = createTableFunctionWorkUnit(
2328  table_func,
2329  eo.just_explain,
2330  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
2331  const auto body = table_func_work_unit.body;
2332  CHECK(body);
2333 
2334  const auto table_infos =
2335  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
2336 
2337  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2338  co.device_type,
2340  nullptr,
2341  executor_->getCatalog(),
2342  executor_->blockSize(),
2343  executor_->gridSize()),
2344  {}};
2345 
2346  auto global_hint = getGlobalQueryHint();
2347  auto use_resultset_recycler = canUseResultsetCache(eo, nullptr);
2348  if (use_resultset_recycler && has_valid_query_plan_dag(table_func)) {
2349  auto cached_resultset =
2350  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
2351  table_func->getQueryPlanDagHash());
2352  if (cached_resultset) {
2353  VLOG(1) << "recycle table function's resultset of the root node "
2354  << table_func->getRelNodeDagId() << " from resultset cache";
2355  result = {cached_resultset, cached_resultset->getTargetMetaInfo()};
2356  addTemporaryTable(-body->getId(), result.getDataPtr());
2357  return result;
2358  }
2359  }
2360 
2361  auto query_exec_time_begin = timer_start();
2362  try {
2363  result = {executor_->executeTableFunction(
2364  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
2365  body->getOutputMetainfo()};
2366  } catch (const QueryExecutionError& e) {
2369  throw std::runtime_error("Table function ran out of memory during execution");
2370  }
2371  auto query_exec_time = timer_stop(query_exec_time_begin);
2372  result.setQueueTime(queue_time_ms);
2373  auto resultset_ptr = result.getDataPtr();
2374  auto allow_auto_caching_resultset = resultset_ptr && resultset_ptr->hasValidBuffer() &&
2376  resultset_ptr->getBufferSizeBytes(co.device_type) <=
2378  bool keep_result = global_hint->isHintRegistered(QueryHint::kKeepTableFuncResult);
2379  if (use_resultset_recycler && (keep_result || allow_auto_caching_resultset) &&
2380  !hasStepForUnion()) {
2381  resultset_ptr->setExecTime(query_exec_time);
2382  resultset_ptr->setQueryPlanHash(table_func_work_unit.exe_unit.query_plan_dag_hash);
2383  resultset_ptr->setTargetMetaInfo(body->getOutputMetainfo());
2384  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
2385  resultset_ptr->setInputTableKeys(std::move(input_table_keys));
2386  if (allow_auto_caching_resultset) {
2387  VLOG(1) << "Automatically keep table function's query resultset to recycler";
2388  }
2389  executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
2390  table_func_work_unit.exe_unit.query_plan_dag_hash,
2391  resultset_ptr->getInputTableKeys(),
2392  resultset_ptr,
2393  resultset_ptr->getBufferSizeBytes(co.device_type),
2395  } else {
2396  if (eo.keep_result) {
2397  if (g_cluster) {
2398  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since we do not "
2399  "support resultset recycling on distributed mode";
2400  } else if (hasStepForUnion()) {
2401  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2402  "has union-(all) operator";
2403  } else if (is_validate_or_explain_query(eo)) {
2404  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2405  "is either validate or explain query";
2406  } else {
2407  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored";
2408  }
2409  }
2410  }
2411 
2412  return result;
2413 }
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool has_valid_query_plan_dag(const RelAlgNode *node)
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
static std::unordered_set< size_t > getScanNodeTableKey(RelAlgNode const *rel_alg_node)
bool hasStepForUnion() const
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:808
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
#define INJECT_TIMER(DESC)
Definition: measure.h:93
bool g_allow_auto_resultset_caching
Definition: Execute.cpp:150
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1371
std::optional< RegisteredQueryHint > getGlobalQueryHint()
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
bool is_validate_or_explain_query(const ExecutionOptions &eo)
Catalog_Namespace::Catalog & cat_
bool canUseResultsetCache(const ExecutionOptions &eo, RenderInfo *render_info) const
static void handlePersistentError(const int32_t error_code)
#define CHECK(condition)
Definition: Logger.h:222
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:371
bool g_cluster
size_t getRelNodeDagId() const
Definition: RelAlgDag.h:862
Executor * executor_
#define VLOG(n)
Definition: Logger.h:316
Type timer_start()
Definition: measure.h:42
size_t g_auto_resultset_caching_threshold
Definition: Execute.cpp:156
bool g_enable_table_functions
Definition: Execute.cpp:112

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeUnion ( const RelLogicalUnion logical_union,
const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2679 of file RelAlgExecutor.cpp.

References RelLogicalUnion::checkForMatchingMetaInfoTypes(), createUnionWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelLogicalUnion::isAll(), isGeometry(), CompilationOptions::makeCpuOnly(), and RelAlgNode::setOutputMetainfo().

Referenced by executeRelAlgStep().

2684  {
2685  auto timer = DEBUG_TIMER(__func__);
2686  if (!logical_union->isAll()) {
2687  throw std::runtime_error("UNION without ALL is not supported yet.");
2688  }
2689  // Will throw a std::runtime_error if types don't match.
2690  logical_union->checkForMatchingMetaInfoTypes();
2691  logical_union->setOutputMetainfo(logical_union->getInput(0)->getOutputMetainfo());
2692  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2693  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2694  }
2695  auto work_unit =
2696  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
2697  return executeWorkUnit(work_unit,
2698  logical_union->getOutputMetainfo(),
2699  false,
2701  eo,
2702  render_info,
2703  queue_time_ms);
2704 }
bool isAll() const
Definition: RelAlgDag.h:2301
void checkForMatchingMetaInfoTypes() const
Definition: RelAlgDag.cpp:858
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool isGeometry(TargetMetaInfo const &target_meta_info)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:826
WorkUnit createUnionWorkUnit(const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define DEBUG_TIMER(name)
Definition: Logger.h:371
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: RelAlgDag.h:795
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: