OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutor Class Reference

#include <RelAlgExecutor.h>

+ Inheritance diagram for RelAlgExecutor:
+ Collaboration diagram for RelAlgExecutor:

Classes

struct  TableFunctionWorkUnit
 
struct  WorkUnit
 

Public Types

using TargetInfoList = std::vector< TargetInfo >
 

Public Member Functions

 RelAlgExecutor (Executor *executor, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
 RelAlgExecutor (Executor *executor, const std::string &query_ra, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
 RelAlgExecutor (Executor *executor, std::unique_ptr< RelAlgDag > query_dag, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
size_t getOuterFragmentCount (const CompilationOptions &co, const ExecutionOptions &eo)
 
ExecutionResult executeRelAlgQuery (const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
 
ExecutionResult executeRelAlgQueryWithFilterPushDown (const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
void prepareLeafExecution (const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
 
ExecutionResult executeRelAlgSeq (const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
 
ExecutionResult executeRelAlgSubSeq (const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
QueryStepExecutionResult executeRelAlgQuerySingleStep (const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
 
void addLeafResult (const unsigned id, const AggregatedResult &result)
 
std::unique_ptr< RelAlgDaggetOwnedRelAlgDag ()
 
RelAlgDaggetRelAlgDag ()
 
const RelAlgNodegetRootRelAlgNode () const
 
void prepareForeignTables ()
 
std::shared_ptr< const RelAlgNodegetRootRelAlgNodeShPtr () const
 
std::pair< std::vector
< unsigned >
, std::unordered_map< unsigned,
JoinQualsPerNestingLevel > > 
getJoinInfo (const RelAlgNode *root_node)
 
std::shared_ptr< RelAlgTranslatorgetRelAlgTranslator (const RelAlgNode *root_node)
 
const std::vector
< std::shared_ptr< RexSubQuery > > & 
getSubqueries () const noexcept
 
std::optional
< RegisteredQueryHint
getParsedQueryHint (const RelAlgNode *node)
 
std::optional
< std::unordered_map< size_t,
std::unordered_map< unsigned,
RegisteredQueryHint > > > 
getParsedQueryHints ()
 
std::optional
< RegisteredQueryHint
getGlobalQueryHint ()
 
ExecutionResult executeSimpleInsert (const Analyzer::Query &insert_query, Fragmenter_Namespace::InsertDataLoader &inserter, const Catalog_Namespace::SessionInfo &session)
 
AggregatedColRange computeColRangesCache ()
 
StringDictionaryGenerations computeStringDictionaryGenerations ()
 
TableGenerations computeTableGenerations ()
 
ExecutorgetExecutor () const
 
void cleanupPostExecution ()
 
void executePostExecutionCallback ()
 
RaExecutionSequence getRaExecutionSequence (const RelAlgNode *root_node, Executor *executor)
 
void prepareForeignTable ()
 
std::unordered_set
< shared::TableKey
getPhysicalTableIds () const
 
void prepareForSystemTableExecution (const CompilationOptions &co) const
 

Static Public Member Functions

static std::string getErrorMessageFromCode (const int32_t error_code)
 

Private Member Functions

void initializeParallelismHints ()
 
ExecutionResult executeRelAlgQueryNoRetry (const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
 
void executeRelAlgStep (const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
void executeUpdate (const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
 
void executeDelete (const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
 
ExecutionResult executeCompound (const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeAggregate (const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
ExecutionResult executeProject (const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
 
ExecutionResult executeTableFunction (const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
 
ExecutionResult executeFilter (const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeSort (const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeLogicalValues (const RelLogicalValues *, const ExecutionOptions &)
 
ExecutionResult executeModify (const RelModify *modify, const ExecutionOptions &eo)
 
ExecutionResult executeUnion (const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
WorkUnit createSortInputWorkUnit (const RelSort *, std::list< Analyzer::OrderEntry > &order_entries, const ExecutionOptions &eo)
 
ExecutionResult executeWorkUnit (const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
 
void computeWindow (const WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
 
std::unique_ptr
< WindowFunctionContext
createWindowFunctionContext (const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, std::unordered_map< QueryPlanHash, std::shared_ptr< HashJoin >> &partition_cache, std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, const WorkUnit &work_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
 
size_t getNDVEstimation (const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
 
std::optional< size_t > getFilteredCountAll (const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
 
FilterSelectivity getFilterSelectivity (const std::vector< std::shared_ptr< Analyzer::Expr >> &filter_expressions, const CompilationOptions &co, const ExecutionOptions &eo)
 
std::vector< PushedDownFilterInfoselectFiltersToBePushedDown (const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
 
bool isRowidLookup (const WorkUnit &work_unit)
 
ExecutionResult handleOutOfMemoryRetry (const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
 
WorkUnit createWorkUnit (const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createCompoundWorkUnit (const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createAggregateWorkUnit (const RelAggregate *, const SortInfo &, const bool just_explain)
 
WorkUnit createProjectWorkUnit (const RelProject *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createFilterWorkUnit (const RelFilter *, const SortInfo &, const bool just_explain)
 
WorkUnit createJoinWorkUnit (const RelJoin *, const SortInfo &, const bool just_explain)
 
WorkUnit createUnionWorkUnit (const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
 
TableFunctionWorkUnit createTableFunctionWorkUnit (const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
 
void addTemporaryTable (const int table_id, const ResultSetPtr &result)
 
void eraseFromTemporaryTables (const int table_id)
 
void handleNop (RaExecutionDesc &ed)
 
std::unordered_map< unsigned,
JoinQualsPerNestingLevel > & 
getLeftDeepJoinTreesInfo ()
 
JoinQualsPerNestingLevel translateLeftDeepJoinFilter (const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
 
std::list< std::shared_ptr
< Analyzer::Expr > > 
makeJoinQuals (const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
 
void setHasStepForUnion (bool flag)
 
bool hasStepForUnion () const
 
bool canUseResultsetCache (const ExecutionOptions &eo, RenderInfo *render_info) const
 
void setupCaching (const RelAlgNode *ra)
 
- Private Member Functions inherited from StorageIOFacility
 StorageIOFacility (Executor *executor)
 
StorageIOFacility::UpdateCallback yieldUpdateCallback (UpdateTransactionParameters &update_parameters)
 
StorageIOFacility::UpdateCallback yieldDeleteCallback (DeleteTransactionParameters &delete_parameters)
 

Static Private Member Functions

static void handlePersistentError (const int32_t error_code)
 

Private Attributes

Executorexecutor_
 
std::unique_ptr< RelAlgDagquery_dag_
 
std::shared_ptr< const
query_state::QueryState
query_state_
 
TemporaryTables temporary_tables_
 
time_t now_
 
std::unordered_map< unsigned,
JoinQualsPerNestingLevel
left_deep_join_info_
 
std::vector< std::shared_ptr
< Analyzer::Expr > > 
target_exprs_owned_
 
std::unordered_map< unsigned,
AggregatedResult
leaf_results_
 
int64_t queue_time_ms_
 
bool has_step_for_union_
 
std::unique_ptr
< TransactionParameters
dml_transaction_parameters_
 
std::optional< std::function
< void()> > 
post_execution_callback_
 

Static Private Attributes

static SpeculativeTopNBlacklist speculative_topn_blacklist_
 

Friends

class PendingExecutionClosure
 

Additional Inherited Members

- Private Types inherited from StorageIOFacility
using UpdateCallback = UpdateLogForFragment::Callback
 
using TableDescriptorType = TableDescriptor
 
using DeleteVictimOffsetList = std::vector< uint64_t >
 
using UpdateTargetOffsetList = std::vector< uint64_t >
 
using UpdateTargetTypeList = std::vector< TargetMetaInfo >
 
using UpdateTargetColumnNamesList = std::vector< std::string >
 
using TransactionLog = Fragmenter_Namespace::InsertOrderFragmenter::ModifyTransactionTracker
 
using TransactionLogPtr = std::unique_ptr< TransactionLog >
 
using ColumnValidationFunction = std::function< bool(std::string const &)>
 

Detailed Description

Definition at line 52 of file RelAlgExecutor.h.

Member Typedef Documentation

Definition at line 54 of file RelAlgExecutor.h.

Constructor & Destructor Documentation

RelAlgExecutor::RelAlgExecutor ( Executor executor,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 56 of file RelAlgExecutor.h.

References initializeParallelismHints().

58  : StorageIOFacility(executor)
59  , executor_(executor)
60  , query_state_(std::move(query_state))
61  , now_(0)
62  , queue_time_ms_(0) {
64  }
int64_t queue_time_ms_
std::shared_ptr< const query_state::QueryState > query_state_
void initializeParallelismHints()
StorageIOFacility(Executor *executor)
Executor * executor_

+ Here is the call graph for this function:

RelAlgExecutor::RelAlgExecutor ( Executor executor,
const std::string &  query_ra,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 66 of file RelAlgExecutor.h.

References initializeParallelismHints().

69  : StorageIOFacility(executor)
70  , executor_(executor)
71  , query_dag_(RelAlgDagBuilder::buildDag(query_ra, true))
72  , query_state_(std::move(query_state))
73  , now_(0)
74  , queue_time_ms_(0) {
76  }
int64_t queue_time_ms_
static std::unique_ptr< RelAlgDag > buildDag(const std::string &query_ra, const bool optimize_dag)
Definition: RelAlgDag.cpp:3237
std::unique_ptr< RelAlgDag > query_dag_
std::shared_ptr< const query_state::QueryState > query_state_
void initializeParallelismHints()
StorageIOFacility(Executor *executor)
Executor * executor_

+ Here is the call graph for this function:

RelAlgExecutor::RelAlgExecutor ( Executor executor,
std::unique_ptr< RelAlgDag query_dag,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 78 of file RelAlgExecutor.h.

References initializeParallelismHints().

81  : StorageIOFacility(executor)
82  , executor_(executor)
83  , query_dag_(std::move(query_dag))
84  , query_state_(std::move(query_state))
85  , now_(0)
86  , queue_time_ms_(0) {
88  }
int64_t queue_time_ms_
std::unique_ptr< RelAlgDag > query_dag_
std::shared_ptr< const query_state::QueryState > query_state_
void initializeParallelismHints()
StorageIOFacility(Executor *executor)
Executor * executor_

+ Here is the call graph for this function:

Member Function Documentation

void RelAlgExecutor::addLeafResult ( const unsigned  id,
const AggregatedResult result 
)
inline

Definition at line 128 of file RelAlgExecutor.h.

References CHECK, and leaf_results_.

128  {
129  const auto it_ok = leaf_results_.emplace(id, result);
130  CHECK(it_ok.second);
131  }
std::unordered_map< unsigned, AggregatedResult > leaf_results_
#define CHECK(condition)
Definition: Logger.h:291
void RelAlgExecutor::addTemporaryTable ( const int  table_id,
const ResultSetPtr result 
)
inlineprivate

Definition at line 396 of file RelAlgExecutor.h.

References CHECK, CHECK_LT, and temporary_tables_.

Referenced by executeRelAlgSeq(), executeRelAlgStep(), executeSort(), executeTableFunction(), and handleNop().

396  {
397  CHECK_LT(size_t(0), result->colCount());
398  CHECK_LT(table_id, 0);
399  auto it_ok = temporary_tables_.emplace(table_id, result);
400  CHECK(it_ok.second);
401  }
TemporaryTables temporary_tables_
#define CHECK_LT(x, y)
Definition: Logger.h:303
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the caller graph for this function:

bool RelAlgExecutor::canUseResultsetCache ( const ExecutionOptions eo,
RenderInfo render_info 
) const
private

Definition at line 490 of file RelAlgExecutor.cpp.

References g_cluster, g_enable_data_recycler, g_use_query_resultset_cache, hasStepForUnion(), anonymous_namespace{RelAlgExecutor.cpp}::is_validate_or_explain_query(), heavyai::InSituFlagsOwnerInterface::isInSitu(), and ExecutionOptions::outer_fragment_indices.

Referenced by executeRelAlgStep(), executeSort(), executeTableFunction(), and executeWorkUnit().

491  {
492  auto validate_or_explain_query = is_validate_or_explain_query(eo);
493  auto query_for_partial_outer_frag = !eo.outer_fragment_indices.empty();
495  !validate_or_explain_query && !hasStepForUnion() &&
496  !query_for_partial_outer_frag &&
497  (!render_info || (render_info && !render_info->isInSitu()));
498 }
bool g_use_query_resultset_cache
Definition: Execute.cpp:148
std::vector< size_t > outer_fragment_indices
bool g_enable_data_recycler
Definition: Execute.cpp:146
bool hasStepForUnion() const
bool is_validate_or_explain_query(const ExecutionOptions &eo)
bool g_cluster

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::cleanupPostExecution ( )

Definition at line 811 of file RelAlgExecutor.cpp.

References CHECK, and executor_.

Referenced by executeRelAlgQueryNoRetry(), and getOuterFragmentCount().

811  {
812  CHECK(executor_);
813  executor_->row_set_mem_owner_ = nullptr;
814 }
#define CHECK(condition)
Definition: Logger.h:291
Executor * executor_

+ Here is the caller graph for this function:

AggregatedColRange RelAlgExecutor::computeColRangesCache ( )

Definition at line 791 of file RelAlgExecutor.cpp.

References executor_, anonymous_namespace{RelAlgExecutor.cpp}::get_physical_inputs_with_spi_col_id(), and getRootRelAlgNode().

791  {
792  AggregatedColRange agg_col_range_cache;
793  const auto phys_inputs = get_physical_inputs_with_spi_col_id(&getRootRelAlgNode());
794  return executor_->computeColRangesCache(phys_inputs);
795 }
const RelAlgNode & getRootRelAlgNode() const
std::unordered_set< PhysicalInput > get_physical_inputs_with_spi_col_id(const RelAlgNode *ra)
Executor * executor_

+ Here is the call graph for this function:

StringDictionaryGenerations RelAlgExecutor::computeStringDictionaryGenerations ( )

Definition at line 797 of file RelAlgExecutor.cpp.

References executor_, anonymous_namespace{RelAlgExecutor.cpp}::get_physical_inputs_with_spi_col_id(), and getRootRelAlgNode().

797  {
798  const auto phys_inputs = get_physical_inputs_with_spi_col_id(&getRootRelAlgNode());
799  return executor_->computeStringDictionaryGenerations(phys_inputs);
800 }
const RelAlgNode & getRootRelAlgNode() const
std::unordered_set< PhysicalInput > get_physical_inputs_with_spi_col_id(const RelAlgNode *ra)
Executor * executor_

+ Here is the call graph for this function:

TableGenerations RelAlgExecutor::computeTableGenerations ( )

Definition at line 802 of file RelAlgExecutor.cpp.

References executor_, get_physical_table_inputs(), and getRootRelAlgNode().

802  {
803  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
804  return executor_->computeTableGenerations(phys_table_ids);
805 }
std::unordered_set< shared::TableKey > get_physical_table_inputs(const RelAlgNode *ra)
const RelAlgNode & getRootRelAlgNode() const
Executor * executor_

+ Here is the call graph for this function:

void RelAlgExecutor::computeWindow ( const WorkUnit work_unit,
const CompilationOptions co,
const ExecutionOptions eo,
ColumnCacheMap column_cache_map,
const int64_t  queue_time_ms 
)
private

Definition at line 2528 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, WindowProjectNodeContext::create(), createWindowFunctionContext(), RelAlgExecutor::WorkUnit::exe_unit, executor_, ExecutionOptions::executor_type, Extern, get_table_infos(), Analyzer::WindowFunction::getPartitionKeys(), RelAlgExecutionUnit::input_descs, kBOOLEAN, kBW_EQ, kONE, RelAlgExecutionUnit::target_exprs, and anonymous_namespace{RelAlgExecutor.cpp}::transform_to_inner().

Referenced by executeUpdate(), and executeWorkUnit().

2532  {
2533  auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
2534  CHECK_EQ(query_infos.size(), size_t(1));
2535  if (query_infos.front().info.fragments.size() != 1) {
2536  throw std::runtime_error(
2537  "Only single fragment tables supported for window functions for now");
2538  }
2539  if (eo.executor_type == ::ExecutorType::Extern) {
2540  return;
2541  }
2542  query_infos.push_back(query_infos.front());
2543  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
2544  // a query may hold multiple window functions having the same partition by condition
2545  // then after building the first hash partition we can reuse it for the rest of
2546  // the window functions
2547  // here, a cached partition can be shared via multiple window function contexts as is
2548  // but sorted partition should be copied to reuse since we use it for (intermediate)
2549  // output buffer
2550  // todo (yoonmin) : support recycler for window function computation?
2551  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>> partition_cache;
2552  std::unordered_map<QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>
2553  sorted_partition_cache;
2554  std::unordered_map<QueryPlanHash, size_t> sorted_partition_key_ref_count_map;
2555  std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>>
2556  window_function_context_map;
2557  std::unordered_map<QueryPlanHash, AggregateTreeForWindowFraming> aggregate_tree_map;
2558  for (size_t target_index = 0; target_index < work_unit.exe_unit.target_exprs.size();
2559  ++target_index) {
2560  const auto& target_expr = work_unit.exe_unit.target_exprs[target_index];
2561  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
2562  if (!window_func) {
2563  continue;
2564  }
2565  // Always use baseline layout hash tables for now, make the expression a tuple.
2566  const auto& partition_keys = window_func->getPartitionKeys();
2567  std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2568  if (partition_keys.size() >= 1) {
2569  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2570  if (partition_keys.size() > 1) {
2571  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2572  } else {
2573  CHECK_EQ(partition_keys.size(), size_t(1));
2574  partition_key_tuple = partition_keys.front();
2575  }
2576  // Creates a tautology equality with the partition expression on both sides.
2577  partition_key_cond =
2578  makeExpr<Analyzer::BinOper>(kBOOLEAN,
2579  kBW_EQ,
2580  kONE,
2581  partition_key_tuple,
2582  transform_to_inner(partition_key_tuple.get()));
2583  }
2584  auto context =
2585  createWindowFunctionContext(window_func,
2586  partition_key_cond /*nullptr if no partition key*/,
2587  partition_cache,
2588  sorted_partition_key_ref_count_map,
2589  work_unit,
2590  query_infos,
2591  co,
2592  column_cache_map,
2593  executor_->getRowSetMemoryOwner());
2594  CHECK(window_function_context_map.emplace(target_index, std::move(context)).second);
2595  }
2596 
2597  for (auto& kv : window_function_context_map) {
2598  kv.second->compute(
2599  sorted_partition_key_ref_count_map, sorted_partition_cache, aggregate_tree_map);
2600  window_project_node_context->addWindowFunctionContext(std::move(kv.second), kv.first);
2601  }
2602 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
static WindowProjectNodeContext * create(Executor *executor)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, std::unordered_map< QueryPlanHash, std::shared_ptr< HashJoin >> &partition_cache, std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, const WorkUnit &work_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
ExecutorType executor_type
Definition: sqldefs.h:71
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqldefs.h:30
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:2580
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createAggregateWorkUnit ( const RelAggregate aggregate,
const SortInfo sort_info,
const bool  just_explain 
)
private

Definition at line 4792 of file RelAlgExecutor.cpp.

References CHECK_EQ, RegisteredQueryHint::defaults(), executor_, QueryPlanDagExtractor::extractJoinInfo(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), getLeftDeepJoinTreesInfo(), RelAlgNode::getOutputMetainfo(), RelAlgNode::getQueryPlanDagHash(), RelAlgNode::inputCount(), now_, query_dag_, query_state_, RelAlgNode::setOutputMetainfo(), anonymous_namespace{RelAlgExecutor.cpp}::synthesize_inputs(), target_exprs_owned_, anonymous_namespace{RelAlgExecutor.cpp}::translate_groupby_exprs(), and anonymous_namespace{RelAlgExecutor.cpp}::translate_targets().

Referenced by createWorkUnit(), and executeAggregate().

4795  {
4796  std::vector<InputDescriptor> input_descs;
4797  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4798  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4799  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
4800  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4801  get_input_desc(aggregate, input_to_nest_level, {});
4802  const auto join_type = get_join_type(aggregate);
4803 
4804  RelAlgTranslator translator(
4805  query_state_, executor_, input_to_nest_level, {join_type}, now_, just_explain);
4806  CHECK_EQ(size_t(1), aggregate->inputCount());
4807  const auto source = aggregate->getInput(0);
4808  const auto& in_metainfo = source->getOutputMetainfo();
4809  const auto scalar_sources =
4810  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
4811  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
4812  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4813  const auto target_exprs = translate_targets(target_exprs_owned_,
4814  target_exprs_type_infos,
4815  scalar_sources,
4816  groupby_exprs,
4817  aggregate,
4818  translator);
4819 
4820  const auto query_infos = get_table_infos(input_descs, executor_);
4821 
4822  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
4823  aggregate->setOutputMetainfo(targets_meta);
4824  auto query_hint = RegisteredQueryHint::defaults();
4825  if (query_dag_) {
4826  auto candidate = query_dag_->getQueryHint(aggregate);
4827  if (candidate) {
4828  query_hint = *candidate;
4829  }
4830  }
4831  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4832  aggregate, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
4833  return {RelAlgExecutionUnit{input_descs,
4834  input_col_descs,
4835  {},
4836  {},
4837  {},
4838  groupby_exprs,
4839  target_exprs,
4840  target_exprs_type_infos,
4841  nullptr,
4842  sort_info,
4843  0,
4844  query_hint,
4845  aggregate->getQueryPlanDagHash(),
4846  join_info.hash_table_plan_dag,
4847  join_info.table_id_to_node_map,
4848  false,
4849  std::nullopt,
4850  query_state_},
4851  aggregate,
4853  nullptr};
4854 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:861
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:874
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, std::unordered_map< size_t, SQLTypeInfo > &target_exprs_type_infos, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:892
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:329
std::shared_ptr< const query_state::QueryState > query_state_
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:109
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
const size_t inputCount() const
Definition: RelAlgDag.h:890
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:876
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createCompoundWorkUnit ( const RelCompound compound,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4473 of file RelAlgExecutor.cpp.

References CHECK_EQ, RegisteredQueryHint::defaults(), anonymous_namespace{RelAlgExecutor.cpp}::do_table_reordering(), executor_, ExecutionOptions::executor_type, QueryPlanDagExtractor::extractJoinInfo(), g_default_max_groups_buffer_entry_guess, g_from_table_reordering, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_left_deep_join_input_sizes(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), getLeftDeepJoinTreesInfo(), RelAlgNode::getQueryPlanDagHash(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), RelAlgNode::inputCount(), ExecutionOptions::just_explain, LEFT, anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), now_, shared::printContainer(), query_dag_, query_state_, anonymous_namespace{RelAlgExecutor.cpp}::rewrite_quals(), RelAlgNode::setOutputMetainfo(), RelAlgExecutionUnit::simple_quals, RelCompound::size(), target_exprs_owned_, anonymous_namespace{RelAlgExecutor.cpp}::translate_groupby_exprs(), anonymous_namespace{RelAlgExecutor.cpp}::translate_quals(), anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), anonymous_namespace{RelAlgExecutor.cpp}::translate_targets(), translateLeftDeepJoinFilter(), and VLOG.

Referenced by createWorkUnit(), executeCompound(), executeDelete(), executeUpdate(), and getOuterFragmentCount().

4476  {
4477  std::vector<InputDescriptor> input_descs;
4478  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4479  auto input_to_nest_level = get_input_nest_levels(compound, {});
4480  std::tie(input_descs, input_col_descs, std::ignore) =
4481  get_input_desc(compound, input_to_nest_level, {});
4482  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4483  const auto query_infos = get_table_infos(input_descs, executor_);
4484  CHECK_EQ(size_t(1), compound->inputCount());
4485  const auto left_deep_join =
4486  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
4487  JoinQualsPerNestingLevel left_deep_join_quals;
4488  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4489  : std::vector<JoinType>{get_join_type(compound)};
4490  std::vector<size_t> input_permutation;
4491  std::vector<size_t> left_deep_join_input_sizes;
4492  std::optional<unsigned> left_deep_tree_id;
4493  if (left_deep_join) {
4494  left_deep_tree_id = left_deep_join->getId();
4495  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4496  left_deep_join_quals = translateLeftDeepJoinFilter(
4497  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4499  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
4500  join_types.end()) {
4501  input_permutation = do_table_reordering(input_descs,
4502  input_col_descs,
4503  left_deep_join_quals,
4504  input_to_nest_level,
4505  compound,
4506  query_infos,
4507  executor_);
4508  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
4509  std::tie(input_descs, input_col_descs, std::ignore) =
4510  get_input_desc(compound, input_to_nest_level, input_permutation);
4511  left_deep_join_quals = translateLeftDeepJoinFilter(
4512  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4513  }
4514  }
4515  RelAlgTranslator translator(
4516  query_state_, executor_, input_to_nest_level, join_types, now_, eo.just_explain);
4517  const auto scalar_sources =
4518  translate_scalar_sources(compound, translator, eo.executor_type);
4519  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
4520  const auto quals_cf = translate_quals(compound, translator);
4521  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4522  const auto target_exprs = translate_targets(target_exprs_owned_,
4523  target_exprs_type_infos,
4524  scalar_sources,
4525  groupby_exprs,
4526  compound,
4527  translator,
4528  eo.executor_type);
4529 
4530  auto query_hint = RegisteredQueryHint::defaults();
4531  if (query_dag_) {
4532  auto candidate = query_dag_->getQueryHint(compound);
4533  if (candidate) {
4534  query_hint = *candidate;
4535  }
4536  }
4537  CHECK_EQ(compound->size(), target_exprs.size());
4538  const RelAlgExecutionUnit exe_unit = {input_descs,
4539  input_col_descs,
4540  quals_cf.simple_quals,
4541  rewrite_quals(quals_cf.quals),
4542  left_deep_join_quals,
4543  groupby_exprs,
4544  target_exprs,
4545  target_exprs_type_infos,
4546  nullptr,
4547  sort_info,
4548  0,
4549  query_hint,
4550  compound->getQueryPlanDagHash(),
4551  {},
4552  {},
4553  false,
4554  std::nullopt,
4555  query_state_};
4556  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4557  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4558  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
4559  compound->setOutputMetainfo(targets_meta);
4560  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4561  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4562  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4563  rewritten_exe_unit.join_quals);
4564  }
4565  if (has_valid_query_plan_dag(compound)) {
4566  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4567  compound, left_deep_tree_id, left_deep_trees_info, executor_);
4568  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4569  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4570  }
4571  return {rewritten_exe_unit,
4572  compound,
4574  std::move(query_rewriter),
4575  input_permutation,
4576  left_deep_join_input_sizes};
4577 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:861
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:165
bool has_valid_query_plan_dag(const RelAlgNode *node)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
size_t size() const override
Definition: RelAlgDag.h:1844
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
std::vector< JoinCondition > JoinQualsPerNestingLevel
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:874
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
bool g_from_table_reordering
Definition: Execute.cpp:90
ExecutorType executor_type
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, std::unordered_map< size_t, SQLTypeInfo > &target_exprs_type_infos, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:892
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:329
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:109
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
QualsConjunctiveForm translate_quals(const RelCompound *compound, const RelAlgTranslator &translator)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:107
const size_t inputCount() const
Definition: RelAlgDag.h:890
Executor * executor_
#define VLOG(n)
Definition: Logger.h:387
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)
std::list< std::shared_ptr< Analyzer::Expr > > rewrite_quals(const std::list< std::shared_ptr< Analyzer::Expr >> &quals)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createFilterWorkUnit ( const RelFilter filter,
const SortInfo sort_info,
const bool  just_explain 
)
private

Definition at line 5306 of file RelAlgExecutor.cpp.

References CHECK_EQ, RegisteredQueryHint::defaults(), executor_, QueryPlanDagExtractor::extractJoinInfo(), fold_expr(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_inputs_meta(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), RelFilter::getCondition(), getLeftDeepJoinTreesInfo(), RelAlgNode::getQueryPlanDagHash(), RelAlgNode::inputCount(), now_, query_dag_, query_state_, rewrite_expr(), RelAlgNode::setOutputMetainfo(), and target_exprs_owned_.

Referenced by createWorkUnit(), and executeFilter().

5308  {
5309  CHECK_EQ(size_t(1), filter->inputCount());
5310  std::vector<InputDescriptor> input_descs;
5311  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5312  std::vector<TargetMetaInfo> in_metainfo;
5313  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
5314  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
5315 
5316  const auto input_to_nest_level = get_input_nest_levels(filter, {});
5317  std::tie(input_descs, input_col_descs, used_inputs_owned) =
5318  get_input_desc(filter, input_to_nest_level, {});
5319  const auto join_type = get_join_type(filter);
5320  RelAlgTranslator translator(
5321  query_state_, executor_, input_to_nest_level, {join_type}, now_, just_explain);
5322  std::tie(in_metainfo, target_exprs_owned) =
5323  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
5324  const auto filter_expr = translator.translate(filter->getCondition());
5325  const auto query_infos = get_table_infos(input_descs, executor_);
5326 
5327  const auto qual = fold_expr(filter_expr.get());
5328  target_exprs_owned_.insert(
5329  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
5330 
5331  const auto target_exprs = get_raw_pointers(target_exprs_owned);
5332  filter->setOutputMetainfo(in_metainfo);
5333  const auto rewritten_qual = rewrite_expr(qual.get());
5334  auto query_hint = RegisteredQueryHint::defaults();
5335  if (query_dag_) {
5336  auto candidate = query_dag_->getQueryHint(filter);
5337  if (candidate) {
5338  query_hint = *candidate;
5339  }
5340  }
5341  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
5342  filter, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
5343  return {{input_descs,
5344  input_col_descs,
5345  {},
5346  {rewritten_qual ? rewritten_qual : qual},
5347  {},
5348  {nullptr},
5349  target_exprs,
5350  {},
5351  nullptr,
5352  sort_info,
5353  0,
5354  query_hint,
5355  filter->getQueryPlanDagHash(),
5356  join_info.hash_table_plan_dag,
5357  join_info.table_id_to_node_map},
5358  filter,
5360  nullptr};
5361 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:861
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
const RexScalar * getCondition() const
Definition: RelAlgDag.h:1696
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:874
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
JoinType get_join_type(const RelAlgNode *ra)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:329
std::shared_ptr< const query_state::QueryState > query_state_
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:109
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
const size_t inputCount() const
Definition: RelAlgDag.h:890
Executor * executor_
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

WorkUnit RelAlgExecutor::createJoinWorkUnit ( const RelJoin ,
const SortInfo ,
const bool  just_explain 
)
private
RelAlgExecutor::WorkUnit RelAlgExecutor::createProjectWorkUnit ( const RelProject project,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4856 of file RelAlgExecutor.cpp.

References RegisteredQueryHint::defaults(), anonymous_namespace{RelAlgExecutor.cpp}::do_table_reordering(), executor_, ExecutionOptions::executor_type, QueryPlanDagExtractor::extractJoinInfo(), g_default_max_groups_buffer_entry_guess, g_from_table_reordering, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_left_deep_join_input_sizes(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), getLeftDeepJoinTreesInfo(), RelAlgNode::getQueryPlanDagHash(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), ExecutionOptions::just_explain, anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), now_, query_dag_, query_state_, RelAlgNode::setOutputMetainfo(), target_exprs_owned_, anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), and translateLeftDeepJoinFilter().

Referenced by createWorkUnit(), executeDelete(), executeProject(), executeUpdate(), and getOuterFragmentCount().

4859  {
4860  std::vector<InputDescriptor> input_descs;
4861  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4862  auto input_to_nest_level = get_input_nest_levels(project, {});
4863  std::tie(input_descs, input_col_descs, std::ignore) =
4864  get_input_desc(project, input_to_nest_level, {});
4865  const auto query_infos = get_table_infos(input_descs, executor_);
4866 
4867  const auto left_deep_join =
4868  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
4869  JoinQualsPerNestingLevel left_deep_join_quals;
4870  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4871  : std::vector<JoinType>{get_join_type(project)};
4872  std::vector<size_t> input_permutation;
4873  std::vector<size_t> left_deep_join_input_sizes;
4874  std::optional<unsigned> left_deep_tree_id;
4875  if (left_deep_join) {
4876  left_deep_tree_id = left_deep_join->getId();
4877  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4878  const auto query_infos = get_table_infos(input_descs, executor_);
4879  left_deep_join_quals = translateLeftDeepJoinFilter(
4880  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4882  input_permutation = do_table_reordering(input_descs,
4883  input_col_descs,
4884  left_deep_join_quals,
4885  input_to_nest_level,
4886  project,
4887  query_infos,
4888  executor_);
4889  input_to_nest_level = get_input_nest_levels(project, input_permutation);
4890  std::tie(input_descs, input_col_descs, std::ignore) =
4891  get_input_desc(project, input_to_nest_level, input_permutation);
4892  left_deep_join_quals = translateLeftDeepJoinFilter(
4893  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4894  }
4895  }
4896 
4897  RelAlgTranslator translator(
4898  query_state_, executor_, input_to_nest_level, join_types, now_, eo.just_explain);
4899  const auto target_exprs_owned =
4900  translate_scalar_sources(project, translator, eo.executor_type);
4901 
4902  target_exprs_owned_.insert(
4903  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4904  const auto target_exprs = get_raw_pointers(target_exprs_owned);
4905  auto query_hint = RegisteredQueryHint::defaults();
4906  if (query_dag_) {
4907  auto candidate = query_dag_->getQueryHint(project);
4908  if (candidate) {
4909  query_hint = *candidate;
4910  }
4911  }
4912  const RelAlgExecutionUnit exe_unit = {input_descs,
4913  input_col_descs,
4914  {},
4915  {},
4916  left_deep_join_quals,
4917  {nullptr},
4918  target_exprs,
4919  {},
4920  nullptr,
4921  sort_info,
4922  0,
4923  query_hint,
4924  project->getQueryPlanDagHash(),
4925  {},
4926  {},
4927  false,
4928  std::nullopt,
4929  query_state_};
4930  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4931  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4932  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
4933  project->setOutputMetainfo(targets_meta);
4934  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4935  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4936  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4937  rewritten_exe_unit.join_quals);
4938  }
4939  if (has_valid_query_plan_dag(project)) {
4940  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4941  project, left_deep_tree_id, left_deep_trees_info, executor_);
4942  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4943  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4944  }
4945  return {rewritten_exe_unit,
4946  project,
4948  std::move(query_rewriter),
4949  input_permutation,
4950  left_deep_join_input_sizes};
4951 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:861
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:165
bool has_valid_query_plan_dag(const RelAlgNode *node)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
std::vector< JoinCondition > JoinQualsPerNestingLevel
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:874
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
bool g_from_table_reordering
Definition: Execute.cpp:90
ExecutorType executor_type
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:892
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:329
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:109
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Executor * executor_
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createSortInputWorkUnit ( const RelSort sort,
std::list< Analyzer::OrderEntry > &  order_entries,
const ExecutionOptions eo 
)
private

Definition at line 3430 of file RelAlgExecutor.cpp.

References CHECK_GT, RelSort::collationCount(), SpeculativeTopNBlacklist::contains(), createWorkUnit(), Default, anonymous_namespace{RelAlgExecutor.cpp}::first_oe_is_desc(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_scan_limit(), get_target_info(), RelAlgNode::getInput(), RelSort::getLimit(), RelSort::getOffset(), RelAlgExecutionUnit::input_descs, RelSort::isLimitDelivered(), RelAlgNode::setOutputMetainfo(), speculative_topn_blacklist_, SpeculativeTopN, and StreamingTopN.

Referenced by executeRelAlgQuerySingleStep(), and executeSort().

3433  {
3434  const auto source = sort->getInput(0);
3435  const size_t limit = sort->getLimit();
3436  const size_t offset = sort->getOffset();
3437  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
3438  const size_t scan_total_limit =
3439  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
3440  size_t max_groups_buffer_entry_guess{
3441  scan_total_limit ? scan_total_limit : g_default_max_groups_buffer_entry_guess};
3443  SortInfo sort_info{
3444  order_entries, sort_algorithm, limit, offset, sort->isLimitDelivered()};
3445  auto source_work_unit = createWorkUnit(source, sort_info, eo);
3446  const auto& source_exe_unit = source_work_unit.exe_unit;
3447 
3448  // we do not allow sorting geometry or array types
3449  for (auto order_entry : order_entries) {
3450  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
3451  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
3452  const auto& ti = get_target_info(te, false);
3453  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
3454  throw std::runtime_error(
3455  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
3456  }
3457  }
3458 
3459  if (source_exe_unit.groupby_exprs.size() == 1) {
3460  if (!source_exe_unit.groupby_exprs.front()) {
3461  sort_algorithm = SortAlgorithm::StreamingTopN;
3462  } else {
3463  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
3464  first_oe_is_desc(order_entries))) {
3465  sort_algorithm = SortAlgorithm::Default;
3466  }
3467  }
3468  }
3469 
3470  sort->setOutputMetainfo(source->getOutputMetainfo());
3471  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
3472  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
3473  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
3474  std::move(source_exe_unit.input_col_descs),
3475  source_exe_unit.simple_quals,
3476  source_exe_unit.quals,
3477  source_exe_unit.join_quals,
3478  source_exe_unit.groupby_exprs,
3479  source_exe_unit.target_exprs,
3480  source_exe_unit.target_exprs_original_type_infos,
3481  nullptr,
3482  {sort_info.order_entries,
3483  sort_algorithm,
3484  limit,
3485  offset,
3486  sort_info.limit_delivered},
3487  scan_total_limit,
3488  source_exe_unit.query_hint,
3489  source_exe_unit.query_plan_dag_hash,
3490  source_exe_unit.hash_table_build_plan_dag,
3491  source_exe_unit.table_id_to_node_map,
3492  source_exe_unit.use_bump_allocator,
3493  source_exe_unit.union_all,
3494  source_exe_unit.query_state},
3495  source,
3496  max_groups_buffer_entry_guess,
3497  std::move(source_work_unit.query_rewriter),
3498  source_work_unit.input_permutation,
3499  source_work_unit.left_deep_join_input_sizes};
3500 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:861
size_t getOffset() const
Definition: RelAlgDag.h:1971
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
static SpeculativeTopNBlacklist speculative_topn_blacklist_
std::vector< InputDescriptor > input_descs
#define CHECK_GT(x, y)
Definition: Logger.h:305
TargetInfo get_target_info(const Analyzer::Expr *target_expr, const bool bigint_count)
Definition: TargetInfo.h:88
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:892
size_t collationCount() const
Definition: RelAlgDag.h:1952
size_t getLimit() const
Definition: RelAlgDag.h:1969
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:109
bool isLimitDelivered() const
Definition: RelAlgDag.h:1967
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::TableFunctionWorkUnit RelAlgExecutor::createTableFunctionWorkUnit ( const RelTableFunction table_func,
const bool  just_explain,
const bool  is_gpu 
)
private

Definition at line 5084 of file RelAlgExecutor.cpp.

References bind_table_function(), CHECK, CHECK_EQ, CHECK_GT, CHECK_LT, RelTableFunction::countRexLiteralArgs(), DEFAULT_ROW_MULTIPLIER_VALUE, executor_, ext_arg_type_to_type_info(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelTableFunction::getColInputsSize(), RelTableFunction::getFunctionName(), RelTableFunction::getTableFuncInputAt(), kENCODING_ARRAY, kINT, LOG, now_, query_state_, RelAlgNode::setOutputMetainfo(), TableFunctions, TableFunctionExecutionUnit::target_exprs, target_exprs_owned_, to_string(), TRANSIENT_DICT_ID, anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), UNREACHABLE, and logger::WARNING.

Referenced by executeTableFunction().

5087  {
5088  std::vector<InputDescriptor> input_descs;
5089  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5090  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
5091  std::tie(input_descs, input_col_descs, std::ignore) =
5092  get_input_desc(rel_table_func, input_to_nest_level, {});
5093  const auto query_infos = get_table_infos(input_descs, executor_);
5094  RelAlgTranslator translator(
5095  query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
5096  auto input_exprs_owned = translate_scalar_sources(
5097  rel_table_func, translator, ::ExecutorType::TableFunctions);
5098  target_exprs_owned_.insert(
5099  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
5100 
5101  const auto table_function_impl_and_type_infos = [=]() {
5102  if (is_gpu) {
5103  try {
5104  return bind_table_function(
5105  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
5106  } catch (ExtensionFunctionBindingError& e) {
5107  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
5108  << " Redirecting " << rel_table_func->getFunctionName()
5109  << " step to run on CPU.";
5110  throw QueryMustRunOnCpu();
5111  }
5112  } else {
5113  try {
5114  return bind_table_function(
5115  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
5116  } catch (ExtensionFunctionBindingError& e) {
5117  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
5118  throw;
5119  }
5120  }
5121  }();
5122  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
5123  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
5124  size_t output_row_sizing_param = 0;
5125  if (table_function_impl
5126  .hasUserSpecifiedOutputSizeParameter()) { // constant and row multiplier
5127  const auto parameter_index =
5128  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
5129  CHECK_GT(parameter_index, size_t(0));
5130  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
5131  const auto parameter_expr =
5132  rel_table_func->getTableFuncInputAt(parameter_index - 1);
5133  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
5134  if (!parameter_expr_literal) {
5135  throw std::runtime_error(
5136  "Provided output buffer sizing parameter is not a literal. Only literal "
5137  "values are supported with output buffer sizing configured table "
5138  "functions.");
5139  }
5140  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
5141  if (literal_val < 0) {
5142  throw std::runtime_error("Provided output sizing parameter " +
5143  std::to_string(literal_val) +
5144  " must be positive integer.");
5145  }
5146  output_row_sizing_param = static_cast<size_t>(literal_val);
5147  } else {
5148  // RowMultiplier not specified in the SQL query. Set it to 1
5149  output_row_sizing_param = 1; // default value for RowMultiplier
5150  static Datum d = {DEFAULT_ROW_MULTIPLIER_VALUE};
5151  static Analyzer::ExpressionPtr DEFAULT_ROW_MULTIPLIER_EXPR =
5152  makeExpr<Analyzer::Constant>(kINT, false, d);
5153  // Push the constant 1 to input_exprs
5154  input_exprs_owned.insert(input_exprs_owned.begin() + parameter_index - 1,
5155  DEFAULT_ROW_MULTIPLIER_EXPR);
5156  }
5157  } else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
5158  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
5159  } else {
5160  UNREACHABLE();
5161  }
5162 
5163  std::vector<Analyzer::ColumnVar*> input_col_exprs;
5164  size_t input_index = 0;
5165  size_t arg_index = 0;
5166  const auto table_func_args = table_function_impl.getInputArgs();
5167  CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
5168  for (const auto& ti : table_function_type_infos) {
5169  if (ti.is_column_list()) {
5170  for (int i = 0; i < ti.get_dimension(); i++) {
5171  auto& input_expr = input_exprs_owned[input_index];
5172  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr.get());
5173  CHECK(col_var);
5174 
5175  // avoid setting type info to ti here since ti doesn't have all the
5176  // properties correctly set
5177  auto type_info = input_expr->get_type_info();
5178  if (ti.is_column_array()) {
5179  type_info.set_compression(kENCODING_ARRAY);
5180  type_info.set_subtype(type_info.get_subtype()); // set type to be subtype
5181  } else {
5182  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5183  }
5184  type_info.set_type(ti.get_type()); // set type to column list
5185  type_info.set_dimension(ti.get_dimension());
5186  input_expr->set_type_info(type_info);
5187 
5188  input_col_exprs.push_back(col_var);
5189  input_index++;
5190  }
5191  } else if (ti.is_column()) {
5192  auto& input_expr = input_exprs_owned[input_index];
5193  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr.get());
5194  CHECK(col_var);
5195  // same here! avoid setting type info to ti since it doesn't have all the
5196  // properties correctly set
5197  auto type_info = input_expr->get_type_info();
5198  if (ti.is_column_array()) {
5199  type_info.set_compression(kENCODING_ARRAY);
5200  type_info.set_subtype(type_info.get_subtype()); // set type to be subtype
5201  } else {
5202  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5203  }
5204  type_info.set_type(ti.get_type()); // set type to column
5205  input_expr->set_type_info(type_info);
5206  input_col_exprs.push_back(col_var);
5207  input_index++;
5208  } else {
5209  auto input_expr = input_exprs_owned[input_index];
5210  auto ext_func_arg_ti = ext_arg_type_to_type_info(table_func_args[arg_index]);
5211  if (ext_func_arg_ti != input_expr->get_type_info()) {
5212  input_exprs_owned[input_index] = input_expr->add_cast(ext_func_arg_ti);
5213  }
5214  input_index++;
5215  }
5216  arg_index++;
5217  }
5218  CHECK_EQ(input_col_exprs.size(), rel_table_func->getColInputsSize());
5219  std::vector<Analyzer::Expr*> table_func_outputs;
5220  constexpr int32_t transient_pos{-1};
5221  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
5222  auto ti = table_function_impl.getOutputSQLType(i);
5223  if (ti.is_dict_encoded_string() || ti.is_text_encoding_dict_array()) {
5224  auto p = table_function_impl.getInputID(i);
5225 
5226  int32_t input_pos = p.first;
5227  if (input_pos == transient_pos) {
5228  ti.set_comp_param(TRANSIENT_DICT_ID);
5229  } else {
5230  // Iterate over the list of arguments to compute the offset. Use this offset to
5231  // get the corresponding input
5232  int32_t offset = 0;
5233  for (int j = 0; j < input_pos; j++) {
5234  const auto ti = table_function_type_infos[j];
5235  offset += ti.is_column_list() ? ti.get_dimension() : 1;
5236  }
5237  input_pos = offset + p.second;
5238 
5239  CHECK_LT(input_pos, input_exprs_owned.size());
5240  const auto& dict_key =
5241  input_exprs_owned[input_pos]->get_type_info().getStringDictKey();
5242  ti.set_comp_param(dict_key.dict_id);
5243  ti.setStringDictKey(dict_key);
5244  }
5245  }
5246  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(
5247  ti, shared::ColumnKey{0, 0, int32_t(i)}, -1));
5248  table_func_outputs.push_back(target_exprs_owned_.back().get());
5249  }
5250  auto input_exprs = get_raw_pointers(input_exprs_owned);
5251  const TableFunctionExecutionUnit exe_unit = {
5252  input_descs,
5253  input_col_descs,
5254  input_exprs, // table function inputs
5255  input_col_exprs, // table function column inputs (duplicates w/ above)
5256  table_func_outputs, // table function projected exprs
5257  output_row_sizing_param, // output buffer sizing param
5258  table_function_impl};
5259  const auto targets_meta = get_targets_meta(rel_table_func, exe_unit.target_exprs);
5260  rel_table_func->setOutputMetainfo(targets_meta);
5261  return {exe_unit, rel_table_func};
5262 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
#define LOG(tag)
Definition: Logger.h:285
#define UNREACHABLE()
Definition: Logger.h:337
std::shared_ptr< Analyzer::Expr > ExpressionPtr
Definition: Analyzer.h:184
#define TRANSIENT_DICT_ID
Definition: DbObjectKeys.h:24
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string to_string(char const *&&v)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
#define CHECK_LT(x, y)
Definition: Logger.h:303
#define DEFAULT_ROW_MULTIPLIER_VALUE
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
const std::tuple< table_functions::TableFunction, std::vector< SQLTypeInfo > > bind_table_function(std::string name, Analyzer::ExpressionPtrVector input_args, const std::vector< table_functions::TableFunction > &table_funcs, const bool is_gpu)
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqltypes.h:62
std::vector< Analyzer::Expr * > target_exprs
Definition: Datum.h:67
Executor * executor_
SQLTypeInfo ext_arg_type_to_type_info(const ExtArgumentType ext_arg_type)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createUnionWorkUnit ( const RelLogicalUnion logical_union,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4977 of file RelAlgExecutor.cpp.

References gpu_enabled::accumulate(), shared::append_move(), CHECK, RelRexToStringConfig::defaults(), RegisteredQueryHint::defaults(), EMPTY_HASHED_PLAN_DAG_KEY, executor_, g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelLogicalUnion::isAll(), shared::printContainer(), query_state_, RelAlgNode::setOutputMetainfo(), anonymous_namespace{RelAlgExecutor.cpp}::target_exprs_for_union(), target_exprs_owned_, RelAlgNode::toString(), and VLOG.

Referenced by executeUnion().

4980  {
4981  std::vector<InputDescriptor> input_descs;
4982  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4983  // Map ra input ptr to index (0, 1).
4984  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
4985  std::tie(input_descs, input_col_descs, std::ignore) =
4986  get_input_desc(logical_union, input_to_nest_level, {});
4987  const auto query_infos = get_table_infos(input_descs, executor_);
4988  auto const max_num_tuples =
4989  std::accumulate(query_infos.cbegin(),
4990  query_infos.cend(),
4991  size_t(0),
4992  [](auto max, auto const& query_info) {
4993  return std::max(max, query_info.info.getNumTuples());
4994  });
4995 
4996  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
4997  for (auto& pair : input_to_nest_level) {
4998  VLOG(3) << " (" << pair.first->toString(RelRexToStringConfig::defaults()) << ", "
4999  << pair.second << ')';
5000  }
5001 
5002  // For UNION queries, we need to keep the target_exprs from both subqueries since they
5003  // may differ on StringDictionaries.
5004  std::vector<Analyzer::Expr*> target_exprs_pair[2];
5005  for (unsigned i = 0; i < 2; ++i) {
5006  auto input_exprs_owned = target_exprs_for_union(logical_union->getInput(i));
5007  CHECK(!input_exprs_owned.empty())
5008  << "No metainfo found for input node(" << i << ") "
5009  << logical_union->getInput(i)->toString(RelRexToStringConfig::defaults());
5010  VLOG(3) << "i(" << i << ") input_exprs_owned.size()=" << input_exprs_owned.size();
5011  for (auto& input_expr : input_exprs_owned) {
5012  VLOG(3) << " " << input_expr->toString();
5013  }
5014  target_exprs_pair[i] = get_raw_pointers(input_exprs_owned);
5015  shared::append_move(target_exprs_owned_, std::move(input_exprs_owned));
5016  }
5017 
5018  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
5019  << " input_col_descs=" << shared::printContainer(input_col_descs)
5020  << " target_exprs.size()=" << target_exprs_pair[0].size()
5021  << " max_num_tuples=" << max_num_tuples;
5022 
5023  const RelAlgExecutionUnit exe_unit = {input_descs,
5024  input_col_descs,
5025  {}, // quals_cf.simple_quals,
5026  {}, // rewrite_quals(quals_cf.quals),
5027  {},
5028  {nullptr},
5029  target_exprs_pair[0],
5030  {},
5031  nullptr,
5032  sort_info,
5033  max_num_tuples,
5036  {},
5037  {},
5038  false,
5039  logical_union->isAll(),
5040  query_state_,
5041  target_exprs_pair[1]};
5042  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
5043  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
5044 
5045  RelAlgNode const* input0 = logical_union->getInput(0);
5046  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
5047  logical_union->setOutputMetainfo(
5048  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5049  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
5050  logical_union->setOutputMetainfo(
5051  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5052  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
5053  logical_union->setOutputMetainfo(
5054  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5055  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
5056  logical_union->setOutputMetainfo(
5057  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5058  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
5059  logical_union->setOutputMetainfo(
5060  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5061  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
5062  logical_union->setOutputMetainfo(
5063  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5064  } else if (auto const* node = dynamic_cast<const RelLogicalValues*>(input0)) {
5065  logical_union->setOutputMetainfo(
5066  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5067  } else if (dynamic_cast<const RelSort*>(input0)) {
5068  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
5069  } else {
5070  throw QueryNotSupported("Unsupported input type: " +
5072  }
5073  VLOG(3) << "logical_union->getOutputMetainfo()="
5074  << shared::printContainer(logical_union->getOutputMetainfo())
5075  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey()="
5076  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey();
5077 
5078  return {rewritten_exe_unit,
5079  logical_union,
5081  std::move(query_rewriter)};
5082 }
bool isAll() const
Definition: RelAlgDag.h:2477
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:861
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
size_t append_move(std::vector< T > &destination, std::vector< T > &&source)
Definition: misc.h:77
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
virtual std::string toString(RelRexToStringConfig config) const =0
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:892
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_for_union(RelAlgNode const *input_node)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
static RegisteredQueryHint defaults()
Definition: QueryHint.h:329
std::shared_ptr< const query_state::QueryState > query_state_
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:49
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:109
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:107
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:876
Executor * executor_
#define VLOG(n)
Definition: Logger.h:387

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::unique_ptr< WindowFunctionContext > RelAlgExecutor::createWindowFunctionContext ( const Analyzer::WindowFunction window_func,
const std::shared_ptr< Analyzer::BinOper > &  partition_key_cond,
std::unordered_map< QueryPlanHash, std::shared_ptr< HashJoin >> &  partition_cache,
std::unordered_map< QueryPlanHash, size_t > &  sorted_partition_key_ref_count_map,
const WorkUnit work_unit,
const std::vector< InputTableInfo > &  query_infos,
const CompilationOptions co,
ColumnCacheMap column_cache_map,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner 
)
private

Definition at line 2604 of file RelAlgExecutor.cpp.

References RegisteredQueryHint::aggregate_tree_fanout, RelAlgExecutor::WorkUnit::body, CHECK, CHECK_EQ, Data_Namespace::CPU_LEVEL, CompilationOptions::device_type, RelAlgExecutor::WorkUnit::exe_unit, executor_, g_window_function_aggregation_tree_fanout, Analyzer::WindowFunction::getArgs(), Analyzer::WindowFunction::getCollation(), ColumnFetcher::getOneColumnFragment(), Analyzer::WindowFunction::getOrderKeys(), RelAlgNode::getQueryPlanDagHash(), GPU, Data_Namespace::GPU_LEVEL, RelAlgExecutionUnit::hash_table_build_plan_dag, Analyzer::WindowFunction::isFrameNavigateWindowFunction(), OneToMany, RelAlgExecutionUnit::query_hint, RelAlgExecutionUnit::table_id_to_node_map, VLOG, WINDOW_FUNCTION, and WINDOW_FUNCTION_FRAMING.

Referenced by computeWindow().

2613  {
2614  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2615  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
2618  std::unique_ptr<WindowFunctionContext> context;
2619  auto partition_cache_key = work_unit.body->getQueryPlanDagHash();
2620  if (partition_key_cond) {
2621  auto partition_cond_str = partition_key_cond->toString();
2622  auto partition_key_hash = boost::hash_value(partition_cond_str);
2623  boost::hash_combine(partition_cache_key, partition_key_hash);
2624  std::shared_ptr<HashJoin> partition_ptr;
2625  auto cached_hash_table_it = partition_cache.find(partition_cache_key);
2626  if (cached_hash_table_it != partition_cache.end()) {
2627  partition_ptr = cached_hash_table_it->second;
2628  VLOG(1) << "Reuse a hash table to compute window function context (key: "
2629  << partition_cache_key << ", partition condition: " << partition_cond_str
2630  << ")";
2631  } else {
2632  JoinType window_partition_type = window_func->isFrameNavigateWindowFunction()
2635  const auto hash_table_or_err = executor_->buildHashTableForQualifier(
2636  partition_key_cond,
2637  query_infos,
2638  memory_level,
2639  window_partition_type,
2641  column_cache_map,
2642  work_unit.exe_unit.hash_table_build_plan_dag,
2643  work_unit.exe_unit.query_hint,
2644  work_unit.exe_unit.table_id_to_node_map);
2645  if (!hash_table_or_err.fail_reason.empty()) {
2646  throw std::runtime_error(hash_table_or_err.fail_reason);
2647  }
2648  CHECK(hash_table_or_err.hash_table->getHashType() == HashType::OneToMany);
2649  partition_ptr = hash_table_or_err.hash_table;
2650  CHECK(partition_cache.insert(std::make_pair(partition_cache_key, partition_ptr))
2651  .second);
2652  VLOG(1) << "Put a generated hash table for computing window function context to "
2653  "cache (key: "
2654  << partition_cache_key << ", partition condition: " << partition_cond_str
2655  << ")";
2656  }
2657  CHECK(partition_ptr);
2658  auto aggregate_tree_fanout = g_window_function_aggregation_tree_fanout;
2659  if (work_unit.exe_unit.query_hint.aggregate_tree_fanout != aggregate_tree_fanout) {
2660  aggregate_tree_fanout = work_unit.exe_unit.query_hint.aggregate_tree_fanout;
2661  VLOG(1) << "Aggregate tree's fanout is set to " << aggregate_tree_fanout;
2662  }
2663  context = std::make_unique<WindowFunctionContext>(window_func,
2664  partition_cache_key,
2665  partition_ptr,
2666  elem_count,
2667  co.device_type,
2668  row_set_mem_owner,
2669  aggregate_tree_fanout);
2670  } else {
2671  context = std::make_unique<WindowFunctionContext>(
2672  window_func, elem_count, co.device_type, row_set_mem_owner);
2673  }
2674  const auto& order_keys = window_func->getOrderKeys();
2675  if (!order_keys.empty()) {
2676  auto sorted_partition_cache_key = partition_cache_key;
2677  for (auto& order_key : order_keys) {
2678  boost::hash_combine(sorted_partition_cache_key, order_key->toString());
2679  }
2680  for (auto& collation : window_func->getCollation()) {
2681  boost::hash_combine(sorted_partition_cache_key, collation.toString());
2682  }
2683  context->setSortedPartitionCacheKey(sorted_partition_cache_key);
2684  auto cache_key_cnt_it =
2685  sorted_partition_key_ref_count_map.try_emplace(sorted_partition_cache_key, 1);
2686  if (!cache_key_cnt_it.second) {
2687  sorted_partition_key_ref_count_map[sorted_partition_cache_key] =
2688  cache_key_cnt_it.first->second + 1;
2689  }
2690 
2691  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2692  for (const auto& order_key : order_keys) {
2693  const auto order_col =
2694  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
2695  if (!order_col) {
2696  throw std::runtime_error("Only order by columns supported for now");
2697  }
2698  auto const [column, col_elem_count] =
2700  *order_col,
2701  query_infos.front().info.fragments.front(),
2702  memory_level,
2703  0,
2704  nullptr,
2705  /*thread_idx=*/0,
2706  chunks_owner,
2707  column_cache_map);
2708 
2709  CHECK_EQ(col_elem_count, elem_count);
2710  context->addOrderColumn(column, order_col->get_type_info(), chunks_owner);
2711  }
2712  }
2713  if (context->getWindowFunction()->hasFraming()) {
2714  // todo (yoonmin) : if we try to support generic window function expression without
2715  // extra project node, we need to revisit here b/c the current logic assumes that
2716  // window function expression has a single input source
2717  auto& window_function_expression_args = window_func->getArgs();
2718  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2719  for (auto& expr : window_function_expression_args) {
2720  if (const auto arg_col_var =
2721  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(expr)) {
2722  auto const [column, col_elem_count] = ColumnFetcher::getOneColumnFragment(
2723  executor_,
2724  *arg_col_var,
2725  query_infos.front().info.fragments.front(),
2726  memory_level,
2727  0,
2728  nullptr,
2729  /*thread_idx=*/0,
2730  chunks_owner,
2731  column_cache_map);
2732  CHECK_EQ(col_elem_count, elem_count);
2733  context->addColumnBufferForWindowFunctionExpression(column, chunks_owner);
2734  }
2735  }
2736  }
2737  return context;
2738 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
JoinType
Definition: sqldefs.h:165
bool isFrameNavigateWindowFunction() const
Definition: Analyzer.h:2630
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:2584
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:2602
size_t g_window_function_aggregation_tree_fanout
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:2578
ExecutorDeviceType device_type
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
#define CHECK(condition)
Definition: Logger.h:291
Executor * executor_
#define VLOG(n)
Definition: Logger.h:387

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createWorkUnit ( const RelAlgNode node,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4297 of file RelAlgExecutor.cpp.

References createAggregateWorkUnit(), createCompoundWorkUnit(), createFilterWorkUnit(), createProjectWorkUnit(), RelRexToStringConfig::defaults(), logger::FATAL, ExecutionOptions::just_explain, LOG, and RelAlgNode::toString().

Referenced by createSortInputWorkUnit(), and getJoinInfo().

4299  {
4300  const auto compound = dynamic_cast<const RelCompound*>(node);
4301  if (compound) {
4302  return createCompoundWorkUnit(compound, sort_info, eo);
4303  }
4304  const auto project = dynamic_cast<const RelProject*>(node);
4305  if (project) {
4306  return createProjectWorkUnit(project, sort_info, eo);
4307  }
4308  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
4309  if (aggregate) {
4310  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
4311  }
4312  const auto filter = dynamic_cast<const RelFilter*>(node);
4313  if (filter) {
4314  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
4315  }
4316  LOG(FATAL) << "Unhandled node type: "
4318  return {};
4319 }
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
#define LOG(tag)
Definition: Logger.h:285
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
virtual std::string toString(RelRexToStringConfig config) const =0
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:49
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::eraseFromTemporaryTables ( const int  table_id)
inlineprivate

Definition at line 403 of file RelAlgExecutor.h.

References temporary_tables_.

403 { temporary_tables_.erase(table_id); }
TemporaryTables temporary_tables_
ExecutionResult RelAlgExecutor::executeAggregate ( const RelAggregate aggregate,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2340 of file RelAlgExecutor.cpp.

References createAggregateWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

2344  {
2345  auto timer = DEBUG_TIMER(__func__);
2346  const auto work_unit = createAggregateWorkUnit(
2347  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2348  return executeWorkUnit(work_unit,
2349  aggregate->getOutputMetainfo(),
2350  true,
2351  co,
2352  eo,
2353  render_info,
2354  queue_time_ms);
2355 }
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define DEBUG_TIMER(name)
Definition: Logger.h:411
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:876

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeCompound ( const RelCompound compound,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2322 of file RelAlgExecutor.cpp.

References createCompoundWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and RelCompound::isAggregate().

Referenced by executeRelAlgStep().

2326  {
2327  auto timer = DEBUG_TIMER(__func__);
2328  const auto work_unit =
2329  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
2330  CompilationOptions co_compound = co;
2331  return executeWorkUnit(work_unit,
2332  compound->getOutputMetainfo(),
2333  compound->isAggregate(),
2334  co_compound,
2335  eo,
2336  render_info,
2337  queue_time_ms);
2338 }
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
bool isAggregate() const
Definition: RelAlgDag.h:1873
#define DEBUG_TIMER(name)
Definition: Logger.h:411
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:876

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeDelete ( const RelAlgNode node,
const CompilationOptions co,
const ExecutionOptions eo_in,
const int64_t  queue_time_ms 
)
private

Definition at line 2215 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, Executor::clearExternalCaches(), createCompoundWorkUnit(), createProjectWorkUnit(), DEBUG_TIMER, Default, RelRexToStringConfig::defaults(), dml_transaction_parameters_, executor_, CompilationOptions::filter_on_deleted_column, get_table_infos(), get_temporary_table(), QueryExecutionError::getErrorCode(), getErrorMessageFromCode(), CompilationOptions::makeCpuOnly(), post_execution_callback_, table_is_temporary(), temporary_tables_, and StorageIOFacility::yieldDeleteCallback().

Referenced by executeRelAlgStep().

2218  {
2219  CHECK(node);
2220  auto timer = DEBUG_TIMER(__func__);
2221 
2222  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
2223  auto& work_unit,
2224  const bool is_aggregate) {
2225  auto* table_descriptor = node->getModifiedTableDescriptor();
2226  CHECK(table_descriptor);
2227  if (!table_descriptor->hasDeletedCol) {
2228  throw std::runtime_error(
2229  "DELETE queries are only supported on tables with the vacuum attribute set to "
2230  "'delayed'");
2231  }
2232 
2233  const auto catalog = node->getModifiedTableCatalog();
2234  CHECK(catalog);
2235  Executor::clearExternalCaches(false, table_descriptor, catalog->getDatabaseId());
2236 
2237  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2238 
2239  auto execute_delete_ra_exe_unit =
2240  [this, &table_infos, &table_descriptor, &eo_in, &co, catalog](
2241  const auto& exe_unit, const bool is_aggregate) {
2243  std::make_unique<DeleteTransactionParameters>(table_descriptor, *catalog);
2244  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
2246  CHECK(delete_params);
2247  auto delete_callback = yieldDeleteCallback(*delete_params);
2249 
2250  auto eo = eo_in;
2251  if (dml_transaction_parameters_->tableIsTemporary()) {
2252  eo.output_columnar_hint = true;
2253  co_delete.filter_on_deleted_column =
2254  false; // project the entire delete column for columnar update
2255  } else {
2256  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
2257  }
2258 
2259  try {
2260  auto table_update_metadata =
2261  executor_->executeUpdate(exe_unit,
2262  table_infos,
2263  table_descriptor,
2264  co_delete,
2265  eo,
2266  *catalog,
2267  executor_->row_set_mem_owner_,
2268  delete_callback,
2269  is_aggregate);
2270  post_execution_callback_ = [table_update_metadata, this, catalog]() {
2271  dml_transaction_parameters_->finalizeTransaction(*catalog);
2272  TableOptimizer table_optimizer{
2273  dml_transaction_parameters_->getTableDescriptor(), executor_, *catalog};
2274  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2275  };
2276  } catch (const QueryExecutionError& e) {
2277  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2278  }
2279  };
2280 
2281  if (table_is_temporary(table_descriptor)) {
2282  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2283  const auto cd = catalog->getDeletedColumn(table_descriptor);
2284  CHECK(cd);
2285  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
2286  cd->columnType,
2288  catalog->getDatabaseId(), table_descriptor->tableId, cd->columnId},
2289  0);
2290  const auto rewritten_exe_unit =
2291  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
2292  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2293  } else {
2294  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2295  }
2296  };
2297 
2298  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2299  const auto work_unit =
2300  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2301  execute_delete_for_node(compound, work_unit, compound->isAggregate());
2302  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2303  auto work_unit =
2304  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
2305  if (project->isSimple()) {
2306  CHECK_EQ(size_t(1), project->inputCount());
2307  const auto input_ra = project->getInput(0);
2308  if (dynamic_cast<const RelSort*>(input_ra)) {
2309  const auto& input_table =
2310  get_temporary_table(&temporary_tables_, -input_ra->getId());
2311  CHECK(input_table);
2312  work_unit.exe_unit.scan_limit = input_table->rowCount();
2313  }
2314  }
2315  execute_delete_for_node(project, work_unit, false);
2316  } else {
2317  throw std::runtime_error("Unsupported parent node for delete: " +
2318  node->toString(RelRexToStringConfig::defaults()));
2319  }
2320 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::optional< std::function< void()> > post_execution_callback_
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
TemporaryTables temporary_tables_
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:225
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool table_is_temporary(const TableDescriptor *const td)
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:49
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:411
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:388
static std::string getErrorMessageFromCode(const int32_t error_code)
std::unique_ptr< TransactionParameters > dml_transaction_parameters_
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeFilter ( const RelFilter filter,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2740 of file RelAlgExecutor.cpp.

References createFilterWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

2744  {
2745  auto timer = DEBUG_TIMER(__func__);
2746  const auto work_unit =
2747  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
2748  return executeWorkUnit(
2749  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
2750 }
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define DEBUG_TIMER(name)
Definition: Logger.h:411
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:876
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeLogicalValues ( const RelLogicalValues logical_values,
const ExecutionOptions eo 
)
private

Definition at line 2795 of file RelAlgExecutor.cpp.

References CPU, DEBUG_TIMER, executor_, RelLogicalValues::getNumRows(), RelLogicalValues::getTupleType(), kBIGINT, kCOUNT, kNULLT, Projection, query_mem_desc, and RelAlgNode::setOutputMetainfo().

Referenced by executeRelAlgStep().

2797  {
2798  auto timer = DEBUG_TIMER(__func__);
2800  logical_values->getNumRows(),
2802  /*is_table_function=*/false);
2803 
2804  auto tuple_type = logical_values->getTupleType();
2805  for (size_t i = 0; i < tuple_type.size(); ++i) {
2806  auto& target_meta_info = tuple_type[i];
2807  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2808  // replace w/ bigint
2809  tuple_type[i] =
2810  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2811  }
2812  query_mem_desc.addColSlotInfo(
2813  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2814  }
2815  logical_values->setOutputMetainfo(tuple_type);
2816 
2817  std::vector<TargetInfo> target_infos;
2818  for (const auto& tuple_type_component : tuple_type) {
2819  target_infos.emplace_back(TargetInfo{false,
2820  kCOUNT,
2821  tuple_type_component.get_type_info(),
2822  SQLTypeInfo(kNULLT, false),
2823  false,
2824  false,
2825  /*is_varlen_projection=*/false});
2826  }
2827 
2828  std::shared_ptr<ResultSet> rs{
2829  ResultSetLogicalValuesBuilder{logical_values,
2830  target_infos,
2833  executor_->getRowSetMemoryOwner(),
2834  executor_}
2835  .build()};
2836 
2837  return {rs, tuple_type};
2838 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:861
size_t getNumRows() const
Definition: RelAlgDag.h:2444
const std::vector< TargetMetaInfo > getTupleType() const
Definition: RelAlgDag.h:2400
Definition: sqldefs.h:78
#define DEBUG_TIMER(name)
Definition: Logger.h:411
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeModify ( const RelModify modify,
const ExecutionOptions eo 
)
private

Definition at line 2888 of file RelAlgExecutor.cpp.

References CPU, DEBUG_TIMER, executor_, and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

2889  {
2890  auto timer = DEBUG_TIMER(__func__);
2891  if (eo.just_explain) {
2892  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2893  }
2894 
2895  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2898  executor_->getRowSetMemoryOwner(),
2899  executor_->blockSize(),
2900  executor_->gridSize());
2901 
2902  std::vector<TargetMetaInfo> empty_targets;
2903  return {rs, empty_targets};
2904 }
std::vector< TargetInfo > TargetInfoList
#define DEBUG_TIMER(name)
Definition: Logger.h:411
Executor * executor_

+ Here is the caller graph for this function:

void RelAlgExecutor::executePostExecutionCallback ( )

Definition at line 4290 of file RelAlgExecutor.cpp.

References post_execution_callback_, and VLOG.

4290  {
4292  VLOG(1) << "Running post execution callback.";
4293  (*post_execution_callback_)();
4294  }
4295 }
std::optional< std::function< void()> > post_execution_callback_
#define VLOG(n)
Definition: Logger.h:387
ExecutionResult RelAlgExecutor::executeProject ( const RelProject project,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const std::optional< size_t >  previous_count 
)
private

Definition at line 2370 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, CPU, createProjectWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), get_temporary_table(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelAlgNode::inputCount(), RelProject::isSimple(), and temporary_tables_.

Referenced by executeRelAlgStep().

2376  {
2377  auto timer = DEBUG_TIMER(__func__);
2378  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
2379  CompilationOptions co_project = co;
2380  if (project->isSimple()) {
2381  CHECK_EQ(size_t(1), project->inputCount());
2382  const auto input_ra = project->getInput(0);
2383  if (dynamic_cast<const RelSort*>(input_ra)) {
2384  co_project.device_type = ExecutorDeviceType::CPU;
2385  const auto& input_table =
2386  get_temporary_table(&temporary_tables_, -input_ra->getId());
2387  CHECK(input_table);
2388  work_unit.exe_unit.scan_limit =
2389  std::min(input_table->getLimit(), input_table->rowCount());
2390  }
2391  }
2392  return executeWorkUnit(work_unit,
2393  project->getOutputMetainfo(),
2394  false,
2395  co_project,
2396  eo,
2397  render_info,
2398  queue_time_ms,
2399  previous_count);
2400 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
TemporaryTables temporary_tables_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:225
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:892
bool isSimple() const
Definition: RelAlgDag.h:1159
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
const size_t inputCount() const
Definition: RelAlgDag.h:890
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:876

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQuery ( const CompilationOptions co,
const ExecutionOptions eo,
const bool  just_explain_plan,
RenderInfo render_info 
)

Definition at line 573 of file RelAlgExecutor.cpp.

References CHECK, DEBUG_TIMER, executeRelAlgQueryNoRetry(), RenderInfo::forceNonInSitu(), g_allow_cpu_retry, logger::INFO, INJECT_TIMER, RelAlgDag::kBuiltOptimized, LOG, CompilationOptions::makeCpuOnly(), post_execution_callback_, query_dag_, run_benchmark::run_query(), and VLOG.

576  {
577  CHECK(query_dag_);
579  << static_cast<int>(query_dag_->getBuildState());
580 
581  auto timer = DEBUG_TIMER(__func__);
583 
584  auto run_query = [&](const CompilationOptions& co_in) {
585  auto execution_result =
586  executeRelAlgQueryNoRetry(co_in, eo, just_explain_plan, render_info);
587 
588  constexpr bool vlog_result_set_summary{false};
589  if constexpr (vlog_result_set_summary) {
590  VLOG(1) << execution_result.getRows()->summaryToString();
591  }
592 
594  VLOG(1) << "Running post execution callback.";
595  (*post_execution_callback_)();
596  }
597  return execution_result;
598  };
599 
600  try {
601  return run_query(co);
602  } catch (const QueryMustRunOnCpu&) {
603  if (!g_allow_cpu_retry) {
604  throw;
605  }
606  }
607  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
608  auto co_cpu = CompilationOptions::makeCpuOnly(co);
609 
610  if (render_info) {
611  render_info->forceNonInSitu();
612  }
613  return run_query(co_cpu);
614 }
std::optional< std::function< void()> > post_execution_callback_
void forceNonInSitu()
Definition: RenderInfo.cpp:45
#define LOG(tag)
Definition: Logger.h:285
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
std::unique_ptr< RelAlgDag > query_dag_
ExecutionResult executeRelAlgQuery(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
bool g_allow_cpu_retry
Definition: Execute.cpp:86
#define VLOG(n)
Definition: Logger.h:387

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQueryNoRetry ( const CompilationOptions co,
const ExecutionOptions eo,
const bool  just_explain_plan,
RenderInfo render_info 
)
private

Definition at line 616 of file RelAlgExecutor.cpp.

References ExecutionOptions::allow_runtime_query_interrupt, CHECK, cleanupPostExecution(), DEBUG_TIMER, Executor::ERR_INTERRUPTED, executeRelAlgQueryWithFilterPushDown(), executeRelAlgSeq(), executor_, ExecutionOptions::find_push_down_candidates, g_enable_dynamic_watchdog, QueryExecutionError::getErrorCode(), getGlobalQueryHint(), getParsedQueryHint(), getRelAlgDag(), getSubqueries(), INJECT_TIMER, join(), ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, anonymous_namespace{RelAlgExecutor.cpp}::prepare_for_system_table_execution(), anonymous_namespace{RelAlgExecutor.cpp}::prepare_foreign_table_for_execution(), query_dag_, query_state_, run_benchmark_import::result, gpu_enabled::reverse(), RelAlgDag::setGlobalQueryHints(), setupCaching(), RelRexToStringConfig::skip_input_nodes, gpu_enabled::sort(), timer_start(), timer_stop(), and to_string().

Referenced by executeRelAlgQuery().

619  {
621  auto timer = DEBUG_TIMER(__func__);
622  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
623 
624  query_dag_->resetQueryExecutionState();
625  const auto& ra = query_dag_->getRootNode();
626 
627  // capture the lock acquistion time
628  auto clock_begin = timer_start();
630  executor_->resetInterrupt();
631  }
632  std::string query_session{""};
633  std::string query_str{"N/A"};
634  std::string query_submitted_time{""};
635  // gather necessary query's info
636  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
637  query_session = query_state_->getConstSessionInfo()->get_session_id();
638  query_str = query_state_->getQueryStr();
639  query_submitted_time = query_state_->getQuerySubmittedTime();
640  }
641 
642  auto validate_or_explain_query =
643  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
644  auto interruptable = !render_info && !query_session.empty() &&
645  eo.allow_runtime_query_interrupt && !validate_or_explain_query;
646  if (interruptable) {
647  // if we reach here, the current query which was waiting an idle executor
648  // within the dispatch queue is now scheduled to the specific executor
649  // (not UNITARY_EXECUTOR)
650  // so we update the query session's status with the executor that takes this query
651  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
652  query_session, query_str, query_submitted_time);
653 
654  // now the query is going to be executed, so update the status as
655  // "RUNNING_QUERY_KERNEL"
656  executor_->updateQuerySessionStatus(
657  query_session,
658  query_submitted_time,
659  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
660  }
661 
662  // so it should do cleanup session info after finishing its execution
663  ScopeGuard clearQuerySessionInfo =
664  [this, &query_session, &interruptable, &query_submitted_time] {
665  // reset the runtime query interrupt status after the end of query execution
666  if (interruptable) {
667  // cleanup running session's info
668  executor_->clearQuerySessionStatus(query_session, query_submitted_time);
669  }
670  };
671 
672  auto acquire_execute_mutex = [](Executor * executor) -> auto {
673  auto ret = executor->acquireExecuteMutex();
674  return ret;
675  };
676  // now we acquire executor lock in here to make sure that this executor holds
677  // all necessary resources and at the same time protect them against other executor
678  auto lock = acquire_execute_mutex(executor_);
679 
680  if (interruptable) {
681  // check whether this query session is "already" interrupted
682  // this case occurs when there is very short gap between being interrupted and
683  // taking the execute lock
684  // if so we have to remove "all" queries initiated by this session and we do in here
685  // without running the query
686  try {
687  executor_->checkPendingQueryStatus(query_session);
688  } catch (QueryExecutionError& e) {
690  throw std::runtime_error("Query execution has been interrupted (pending query)");
691  }
692  throw e;
693  } catch (...) {
694  throw std::runtime_error("Checking pending query status failed: unknown error");
695  }
696  }
697  int64_t queue_time_ms = timer_stop(clock_begin);
698 
700 
701  // Notify foreign tables to load prior to caching
703 
704  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
705  setupCaching(&ra);
706 
707  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
708  auto ed_seq = RaExecutionSequence(&ra, executor_);
709 
710  if (just_explain_plan) {
711  std::stringstream ss;
712  std::vector<const RelAlgNode*> nodes;
713  for (size_t i = 0; i < ed_seq.size(); i++) {
714  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
715  }
716  size_t ctr_node_id_in_plan = nodes.size();
717  for (auto& body : boost::adaptors::reverse(nodes)) {
718  // we set each node's id in the query plan in advance before calling toString
719  // method to properly represent the query plan
720  auto node_id_in_plan_tree = ctr_node_id_in_plan--;
721  body->setIdInPlanTree(node_id_in_plan_tree);
722  }
723  size_t ctr = nodes.size();
724  size_t tab_ctr = 0;
725  RelRexToStringConfig config;
726  config.skip_input_nodes = true;
727  for (auto& body : boost::adaptors::reverse(nodes)) {
728  const auto index = ctr--;
729  const auto tabs = std::string(tab_ctr++, '\t');
730  CHECK(body);
731  ss << tabs << std::to_string(index) << " : " << body->toString(config) << "\n";
732  if (auto sort = dynamic_cast<const RelSort*>(body)) {
733  ss << tabs << " : " << sort->getInput(0)->toString(config) << "\n";
734  }
735  if (dynamic_cast<const RelProject*>(body) ||
736  dynamic_cast<const RelCompound*>(body)) {
737  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
738  ss << tabs << " : " << join->toString(config) << "\n";
739  }
740  }
741  }
742  const auto& subqueries = getSubqueries();
743  if (!subqueries.empty()) {
744  ss << "Subqueries: "
745  << "\n";
746  for (const auto& subquery : subqueries) {
747  const auto ra = subquery->getRelAlg();
748  ss << "\t" << ra->toString(config) << "\n";
749  }
750  }
751  auto rs = std::make_shared<ResultSet>(ss.str());
752  return {rs, {}};
753  }
754 
755  if (eo.find_push_down_candidates) {
756  // this extra logic is mainly due to current limitations on multi-step queries
757  // and/or subqueries.
759  ed_seq, co, eo, render_info, queue_time_ms);
760  }
761  timer_setup.stop();
762 
763  // Dispatch the subqueries first
764  const auto global_hints = getGlobalQueryHint();
765  for (auto subquery : getSubqueries()) {
766  const auto subquery_ra = subquery->getRelAlg();
767  CHECK(subquery_ra);
768  if (subquery_ra->hasContextData()) {
769  continue;
770  }
771  // Execute the subquery and cache the result.
772  RelAlgExecutor subquery_executor(executor_, query_state_);
773  // Propagate global and local query hint if necessary
774  const auto local_hints = getParsedQueryHint(subquery_ra);
775  if (global_hints || local_hints) {
776  const auto subquery_rel_alg_dag = subquery_executor.getRelAlgDag();
777  if (global_hints) {
778  subquery_rel_alg_dag->setGlobalQueryHints(*global_hints);
779  }
780  if (local_hints) {
781  subquery_rel_alg_dag->registerQueryHint(subquery_ra, *local_hints);
782  }
783  }
784  RaExecutionSequence subquery_seq(subquery_ra, executor_);
785  auto result = subquery_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
786  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
787  }
788  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
789 }
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1436
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
void setupCaching(const RelAlgNode *ra)
std::string join(T const &container, std::string const &delim)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:80
std::optional< RegisteredQueryHint > getParsedQueryHint(const RelAlgNode *node)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
std::string to_string(char const *&&v)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
A container for relational algebra descriptors defining the execution order for a relational algebra ...
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
std::optional< RegisteredQueryHint > getGlobalQueryHint()
std::unique_ptr< RelAlgDag > query_dag_
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node)
std::shared_ptr< const query_state::QueryState > query_state_
void prepare_for_system_table_execution(const RelAlgNode &ra_node, const CompilationOptions &co)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
void cleanupPostExecution()
DEVICE void reverse(ARGS &&...args)
Definition: gpu_enabled.h:96
Executor * executor_
Type timer_start()
Definition: measure.h:42

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

QueryStepExecutionResult RelAlgExecutor::executeRelAlgQuerySingleStep ( const RaExecutionSequence seq,
const size_t  step_idx,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info 
)

Definition at line 841 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, anonymous_namespace{RelAlgExecutor.cpp}::check_sort_node_source_constraint(), CPU, createSortInputWorkUnit(), CompilationOptions::device_type, executeRelAlgSubSeq(), anonymous_namespace{RelAlgExecutor.cpp}::get_order_entries(), RaExecutionSequence::getDescriptor(), GPU, logger::INFO, INJECT_TIMER, ExecutionOptions::just_validate, LOG, anonymous_namespace{RelAlgExecutor.cpp}::node_is_aggregate(), post_execution_callback_, queue_time_ms_, Reduce, GroupByAndAggregate::shard_count_for_top_groups(), gpu_enabled::sort(), Union, and VLOG.

846  {
847  INJECT_TIMER(executeRelAlgQueryStep);
848 
849  auto exe_desc_ptr = seq.getDescriptor(step_idx);
850  CHECK(exe_desc_ptr);
851  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
852 
853  size_t shard_count{0};
854  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
855  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
856  };
857 
858  if (sort) {
860  auto order_entries = get_order_entries(sort);
861  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
862  shard_count =
863  GroupByAndAggregate::shard_count_for_top_groups(source_work_unit.exe_unit);
864  if (!shard_count) {
865  // No point in sorting on the leaf, only execute the input to the sort node.
866  CHECK_EQ(size_t(1), sort->inputCount());
867  const auto source = sort->getInput(0);
868  if (sort->collationCount() || node_is_aggregate(source)) {
869  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
870  CHECK_EQ(temp_seq.size(), size_t(1));
871  ExecutionOptions eo_copy = eo;
872  eo_copy.just_validate = eo.just_validate || sort->isEmptyResult();
873  // Use subseq to avoid clearing existing temporary tables
874  return {
875  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
876  merge_type(source),
877  source->getId(),
878  false};
879  }
880  }
881  }
882  QueryStepExecutionResult query_step_result{ExecutionResult{},
883  merge_type(exe_desc_ptr->getBody()),
884  exe_desc_ptr->getBody()->getId(),
885  false};
886  try {
887  query_step_result.result = executeRelAlgSubSeq(
888  seq, std::make_pair(step_idx, step_idx + 1), co, eo, render_info, queue_time_ms_);
889  } catch (QueryMustRunOnCpu const& e) {
891  auto copied_co = co;
893  LOG(INFO) << "Retry the query via CPU mode";
894  query_step_result.result = executeRelAlgSubSeq(seq,
895  std::make_pair(step_idx, step_idx + 1),
896  copied_co,
897  eo,
898  render_info,
900  }
902  VLOG(1) << "Running post execution callback.";
903  (*post_execution_callback_)();
904  }
905  return query_step_result;
906 }
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::optional< std::function< void()> > post_execution_callback_
RaExecutionDesc * getDescriptor(size_t idx) const
#define LOG(tag)
Definition: Logger.h:285
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
void check_sort_node_source_constraint(const RelSort *sort)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
MergeType
A container for relational algebra descriptors defining the execution order for a relational algebra ...
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
ExecutorDeviceType device_type
bool node_is_aggregate(const RelAlgNode *ra)
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit)
#define CHECK(condition)
Definition: Logger.h:291
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
#define VLOG(n)
Definition: Logger.h:387
WorkUnit createSortInputWorkUnit(const RelSort *, std::list< Analyzer::OrderEntry > &order_entries, const ExecutionOptions &eo)

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQueryWithFilterPushDown ( const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)

Definition at line 148 of file JoinFilterPushDown.cpp.

References CHECK, executeRelAlgSeq(), executor_, ExecutionOptions::find_push_down_candidates, getSubqueries(), ExecutionOptions::just_calcite_explain, run_benchmark_import::result, and RaExecutionSequence::size().

Referenced by executeRelAlgQueryNoRetry().

153  {
154  // we currently do not fully support filter push down with
155  // multi-step execution and/or with subqueries
156  // TODO(Saman): add proper caching to enable filter push down for all cases
157  const auto& subqueries = getSubqueries();
158  if (seq.size() > 1 || !subqueries.empty()) {
159  if (eo.just_calcite_explain) {
160  return ExecutionResult(std::vector<PushedDownFilterInfo>{},
162  }
163  auto eo_modified = eo;
164  eo_modified.find_push_down_candidates = false;
165  eo_modified.just_calcite_explain = false;
166 
167  // Dispatch the subqueries first
168  for (auto subquery : subqueries) {
169  // Execute the subquery and cache the result.
170  RelAlgExecutor ra_executor(executor_);
171  const auto subquery_ra = subquery->getRelAlg();
172  CHECK(subquery_ra);
173  RaExecutionSequence subquery_seq(subquery_ra, executor_);
174  auto result =
175  ra_executor.executeRelAlgSeq(subquery_seq, co, eo_modified, nullptr, 0);
176  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
177  }
178  return executeRelAlgSeq(seq, co, eo_modified, render_info, queue_time_ms);
179  }
180  // else
181  return executeRelAlgSeq(seq, co, eo, render_info, queue_time_ms);
182 }
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
A container for relational algebra descriptors defining the execution order for a relational algebra ...
#define CHECK(condition)
Definition: Logger.h:291
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgSeq ( const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const bool  with_existing_temp_tables = false 
)

Definition at line 925 of file RelAlgExecutor.cpp.

References addTemporaryTable(), CHECK, CHECK_GE, DEBUG_TIMER, CompilationOptions::device_type, RaExecutionSequence::empty(), executeRelAlgStep(), executor_, RenderInfo::forceNonInSitu(), g_allow_query_step_cpu_retry, g_allow_query_step_skipping, g_cluster, g_enable_interop, RaExecutionDesc::getBody(), RaExecutionSequence::getDescriptor(), RaExecutionSequence::getSkippedQueryStepCacheKeys(), GPU, RaExecutionSequence::hasQueryStepForUnion(), logger::INFO, INJECT_TIMER, ExecutionOptions::just_explain, ExecutionOptions::keep_result, left_deep_join_info_, LOG, CompilationOptions::makeCpuOnly(), now_, RaExecutionSequence::size(), gpu_enabled::swap(), target_exprs_owned_, temporary_tables_, and VLOG.

Referenced by executeRelAlgQueryNoRetry(), and executeRelAlgQueryWithFilterPushDown().

930  {
932  auto timer = DEBUG_TIMER(__func__);
933  if (!with_existing_temp_tables) {
935  }
938  executor_->temporary_tables_ = &temporary_tables_;
939 
940  time(&now_);
941  CHECK(!seq.empty());
942 
943  auto get_descriptor_count = [&seq, &eo]() -> size_t {
944  if (eo.just_explain) {
945  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
946  // run the logical values descriptor to generate the result set, then the next
947  // descriptor to generate the explain
948  CHECK_GE(seq.size(), size_t(2));
949  return 2;
950  } else {
951  return 1;
952  }
953  } else {
954  return seq.size();
955  }
956  };
957 
958  const auto exec_desc_count = get_descriptor_count();
959  auto eo_copied = eo;
960  if (seq.hasQueryStepForUnion()) {
961  // we currently do not support resultset recycling when an input query
962  // contains union (all) operation
963  eo_copied.keep_result = false;
964  }
965 
966  // we have to register resultset(s) of the skipped query step(s) as temporary table
967  // before executing the remaining query steps
968  // since they may be required during the query processing
969  // i.e., get metadata of the target expression from the skipped query step
971  for (const auto& kv : seq.getSkippedQueryStepCacheKeys()) {
972  const auto cached_res =
973  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(kv.second);
974  CHECK(cached_res);
975  addTemporaryTable(kv.first, cached_res);
976  }
977  }
978 
979  const auto num_steps = exec_desc_count - 1;
980  for (size_t i = 0; i < exec_desc_count; i++) {
981  VLOG(1) << "Executing query step " << i << " / " << num_steps;
982  try {
984  seq, i, co, eo_copied, (i == num_steps) ? render_info : nullptr, queue_time_ms);
985  } catch (const QueryMustRunOnCpu&) {
986  // Do not allow per-step retry if flag is off or in distributed mode
987  // TODO(todd): Determine if and when we can relax this restriction
988  // for distributed
991  throw;
992  }
993  LOG(INFO) << "Retrying current query step " << i << " / " << num_steps << " on CPU";
994  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
995  if (render_info && i == num_steps) {
996  // only render on the last step
997  render_info->forceNonInSitu();
998  }
999  executeRelAlgStep(seq,
1000  i,
1001  co_cpu,
1002  eo_copied,
1003  (i == num_steps) ? render_info : nullptr,
1004  queue_time_ms);
1005  } catch (const NativeExecutionError&) {
1006  if (!g_enable_interop) {
1007  throw;
1008  }
1009  auto eo_extern = eo_copied;
1010  eo_extern.executor_type = ::ExecutorType::Extern;
1011  auto exec_desc_ptr = seq.getDescriptor(i);
1012  const auto body = exec_desc_ptr->getBody();
1013  const auto compound = dynamic_cast<const RelCompound*>(body);
1014  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
1015  LOG(INFO) << "Also failed to run the query using interoperability";
1016  throw;
1017  }
1019  seq, i, co, eo_extern, (i == num_steps) ? render_info : nullptr, queue_time_ms);
1020  }
1021  }
1022 
1023  return seq.getDescriptor(num_steps)->getResult();
1024 }
RaExecutionDesc * getDescriptor(size_t idx) const
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
void forceNonInSitu()
Definition: RenderInfo.cpp:45
const bool hasQueryStepForUnion() const
#define LOG(tag)
Definition: Logger.h:285
bool g_allow_query_step_skipping
Definition: Execute.cpp:151
const std::unordered_map< int, QueryPlanHash > getSkippedQueryStepCacheKeys() const
TemporaryTables temporary_tables_
#define CHECK_GE(x, y)
Definition: Logger.h:306
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
bool g_enable_interop
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutorDeviceType device_type
std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_join_info_
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
const RelAlgNode * getBody() const
bool g_allow_query_step_cpu_retry
Definition: Execute.cpp:87
bool g_cluster
Executor * executor_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define VLOG(n)
Definition: Logger.h:387

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeRelAlgStep ( const RaExecutionSequence seq,
const size_t  step_idx,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 1071 of file RelAlgExecutor.cpp.

References addTemporaryTable(), ExecutionOptions::allow_loop_joins, anonymous_namespace{RelAlgExecutor.cpp}::build_render_targets(), canUseResultsetCache(), CHECK, CPU, DEBUG_TIMER, RelRexToStringConfig::defaults(), CompilationOptions::device_type, ExecutionOptions::dynamic_watchdog_time_limit, executeAggregate(), executeCompound(), executeDelete(), executeFilter(), executeLogicalValues(), executeModify(), executeProject(), executeSort(), executeTableFunction(), executeUnion(), executeUpdate(), executor_, logger::FATAL, g_cluster, g_enable_data_recycler, g_skip_intermediate_count, g_use_query_resultset_cache, RelAlgNode::getContextData(), RaExecutionSequence::getDescriptor(), RaExecutionSequence::getDescriptorByBodyId(), RelAlgNode::getId(), RelAlgNode::getInput(), getParsedQueryHint(), handleNop(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), RelAlgNode::hasContextData(), RaExecutionSequence::hasQueryStepForUnion(), logger::INFO, INJECT_TIMER, kAllowLoopJoin, kColumnarOutput, kCpuMode, kCudaBlockSize, kCudaGridSize, kDisableLoopJoin, kDynamicWatchdog, kDynamicWatchdogOff, ExecutionOptions::keep_result, kKeepResult, kKeepTableFuncResult, kMaxJoinHashTableSize, kOptCudaBlockAndGridSizes, kQueryTimeLimit, kRowwiseOutput, kWatchdog, kWatchdogOff, LOG, ExecutionOptions::max_join_hash_table_size, ExecutionOptions::optimize_cuda_block_and_grid_sizes, ExecutionOptions::outer_fragment_indices, ExecutionOptions::output_columnar_hint, setHasStepForUnion(), gpu_enabled::sort(), VLOG, ExecutionOptions::with_dynamic_watchdog, and ExecutionOptions::with_watchdog.

Referenced by executeRelAlgSeq(), and executeRelAlgSubSeq().

1076  {
1078  auto timer = DEBUG_TIMER(__func__);
1079  auto exec_desc_ptr = seq.getDescriptor(step_idx);
1080  CHECK(exec_desc_ptr);
1081  auto& exec_desc = *exec_desc_ptr;
1082  const auto body = exec_desc.getBody();
1083  if (body->isNop()) {
1084  handleNop(exec_desc);
1085  return;
1086  }
1087  ExecutionOptions eo_work_unit = eo;
1088  eo_work_unit.with_watchdog =
1089  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body));
1090  eo_work_unit.outer_fragment_indices =
1091  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>();
1092 
1093  auto handle_hint = [co,
1094  eo_work_unit,
1095  body,
1096  this]() -> std::pair<CompilationOptions, ExecutionOptions> {
1097  ExecutionOptions eo_hint_applied = eo_work_unit;
1098  CompilationOptions co_hint_applied = co;
1099  auto target_node = body;
1100  if (auto sort_body = dynamic_cast<const RelSort*>(body)) {
1101  target_node = sort_body->getInput(0);
1102  }
1103  auto query_hints = getParsedQueryHint(target_node);
1104  auto columnar_output_hint_enabled = false;
1105  auto rowwise_output_hint_enabled = false;
1106  if (query_hints) {
1107  if (query_hints->isHintRegistered(QueryHint::kCpuMode)) {
1108  VLOG(1) << "A user forces to run the query on the CPU execution mode";
1109  co_hint_applied.device_type = ExecutorDeviceType::CPU;
1110  }
1111  if (query_hints->isHintRegistered(QueryHint::kKeepResult)) {
1112  if (!g_enable_data_recycler) {
1113  VLOG(1) << "A user enables keeping query resultset but is skipped since data "
1114  "recycler is disabled";
1115  }
1117  VLOG(1) << "A user enables keeping query resultset but is skipped since query "
1118  "resultset recycler is disabled";
1119  } else {
1120  VLOG(1) << "A user enables keeping query resultset";
1121  eo_hint_applied.keep_result = true;
1122  }
1123  }
1124  if (query_hints->isHintRegistered(QueryHint::kKeepTableFuncResult)) {
1125  // we use this hint within the function 'executeTableFunction`
1126  if (!g_enable_data_recycler) {
1127  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1128  "since data recycler is disabled";
1129  }
1131  VLOG(1) << "A user enables keeping table function's resultset but is skipped "
1132  "since query resultset recycler is disabled";
1133  } else {
1134  VLOG(1) << "A user enables keeping table function's resultset";
1135  eo_hint_applied.keep_result = true;
1136  }
1137  }
1138  if (query_hints->isHintRegistered(QueryHint::kWatchdog)) {
1139  if (!eo_hint_applied.with_watchdog) {
1140  VLOG(1) << "A user enables watchdog for this query";
1141  eo_hint_applied.with_watchdog = true;
1142  }
1143  }
1144  if (query_hints->isHintRegistered(QueryHint::kWatchdogOff)) {
1145  if (eo_hint_applied.with_watchdog) {
1146  VLOG(1) << "A user disables watchdog for this query";
1147  eo_hint_applied.with_watchdog = false;
1148  }
1149  }
1150  if (query_hints->isHintRegistered(QueryHint::kDynamicWatchdog)) {
1151  if (!eo_hint_applied.with_dynamic_watchdog) {
1152  VLOG(1) << "A user enables dynamic watchdog for this query";
1153  eo_hint_applied.with_watchdog = true;
1154  }
1155  }
1156  if (query_hints->isHintRegistered(QueryHint::kDynamicWatchdogOff)) {
1157  if (eo_hint_applied.with_dynamic_watchdog) {
1158  VLOG(1) << "A user disables dynamic watchdog for this query";
1159  eo_hint_applied.with_watchdog = false;
1160  }
1161  }
1162  if (query_hints->isHintRegistered(QueryHint::kQueryTimeLimit)) {
1163  std::ostringstream oss;
1164  oss << "A user sets query time limit to " << query_hints->query_time_limit
1165  << " ms";
1166  eo_hint_applied.dynamic_watchdog_time_limit = query_hints->query_time_limit;
1167  if (!eo_hint_applied.with_dynamic_watchdog) {
1168  eo_hint_applied.with_dynamic_watchdog = true;
1169  oss << " (and system automatically enables dynamic watchdog to activate the "
1170  "given \"query_time_limit\" hint)";
1171  }
1172  VLOG(1) << oss.str();
1173  }
1174  if (query_hints->isHintRegistered(QueryHint::kAllowLoopJoin)) {
1175  VLOG(1) << "A user enables loop join";
1176  eo_hint_applied.allow_loop_joins = true;
1177  }
1178  if (query_hints->isHintRegistered(QueryHint::kDisableLoopJoin)) {
1179  VLOG(1) << "A user disables loop join";
1180  eo_hint_applied.allow_loop_joins = false;
1181  }
1182  if (query_hints->isHintRegistered(QueryHint::kMaxJoinHashTableSize)) {
1183  eo_hint_applied.max_join_hash_table_size = query_hints->max_join_hash_table_size;
1184  VLOG(1) << "A user forces the maximum size of a join hash table as "
1185  << eo_hint_applied.max_join_hash_table_size << " bytes";
1186  }
1187  if (query_hints->isHintRegistered(QueryHint::kOptCudaBlockAndGridSizes)) {
1188  if (query_hints->isHintRegistered(QueryHint::kCudaGridSize) ||
1189  query_hints->isHintRegistered(QueryHint::kCudaBlockSize)) {
1190  VLOG(1) << "Skip query hint \"opt_cuda_grid_and_block_size\" when at least one "
1191  "of the following query hints are given simultaneously: "
1192  "\"cuda_block_size\" and \"cuda_grid_size_multiplier\"";
1193  } else {
1194  VLOG(1) << "A user enables optimization of cuda block and grid sizes";
1195  eo_hint_applied.optimize_cuda_block_and_grid_sizes = true;
1196  }
1197  }
1198  if (query_hints->isHintRegistered(QueryHint::kColumnarOutput)) {
1199  VLOG(1) << "A user forces the query to run with columnar output";
1200  columnar_output_hint_enabled = true;
1201  } else if (query_hints->isHintRegistered(QueryHint::kRowwiseOutput)) {
1202  VLOG(1) << "A user forces the query to run with rowwise output";
1203  rowwise_output_hint_enabled = true;
1204  }
1205  }
1206  auto columnar_output_enabled = eo_work_unit.output_columnar_hint
1207  ? !rowwise_output_hint_enabled
1208  : columnar_output_hint_enabled;
1209  if (g_cluster && (columnar_output_hint_enabled || rowwise_output_hint_enabled)) {
1210  LOG(INFO) << "Currently, we do not support applying query hint to change query "
1211  "output layout in distributed mode.";
1212  }
1213  eo_hint_applied.output_columnar_hint = columnar_output_enabled;
1214  return std::make_pair(co_hint_applied, eo_hint_applied);
1215  };
1216 
1217  auto hint_applied = handle_hint();
1219 
1220  if (canUseResultsetCache(eo, render_info) && has_valid_query_plan_dag(body)) {
1221  if (auto cached_resultset =
1222  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
1223  body->getQueryPlanDagHash())) {
1224  VLOG(1) << "recycle resultset of the root node " << body->getRelNodeDagId()
1225  << " from resultset cache";
1226  body->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
1227  if (render_info) {
1228  std::vector<std::shared_ptr<Analyzer::Expr>>& cached_target_exprs =
1229  executor_->getRecultSetRecyclerHolder().getTargetExprs(
1230  body->getQueryPlanDagHash());
1231  std::vector<Analyzer::Expr*> copied_target_exprs;
1232  for (const auto& expr : cached_target_exprs) {
1233  copied_target_exprs.push_back(expr.get());
1234  }
1236  *render_info, copied_target_exprs, cached_resultset->getTargetMetaInfo());
1237  }
1238  exec_desc.setResult({cached_resultset, cached_resultset->getTargetMetaInfo()});
1239  addTemporaryTable(-body->getId(), exec_desc.getResult().getDataPtr());
1240  return;
1241  }
1242  }
1243 
1244  const auto compound = dynamic_cast<const RelCompound*>(body);
1245  if (compound) {
1246  if (compound->isDeleteViaSelect()) {
1247  executeDelete(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1248  } else if (compound->isUpdateViaSelect()) {
1249  executeUpdate(compound, hint_applied.first, hint_applied.second, queue_time_ms);
1250  } else {
1251  exec_desc.setResult(executeCompound(
1252  compound, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1253  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
1254  << static_cast<int>(-compound->getId()) << ", ...)"
1255  << " exec_desc.getResult().getDataPtr()->rowCount()="
1256  << exec_desc.getResult().getDataPtr()->rowCount();
1257  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1258  return;
1259  }
1260  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
1261  }
1262  return;
1263  }
1264  const auto project = dynamic_cast<const RelProject*>(body);
1265  if (project) {
1266  if (project->isDeleteViaSelect()) {
1267  executeDelete(project, hint_applied.first, hint_applied.second, queue_time_ms);
1268  } else if (project->isUpdateViaSelect()) {
1269  executeUpdate(project, hint_applied.first, hint_applied.second, queue_time_ms);
1270  } else {
1271  std::optional<size_t> prev_count;
1272  // Disabling the intermediate count optimization in distributed, as the previous
1273  // execution descriptor will likely not hold the aggregated result.
1274  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
1275  // If the previous node produced a reliable count, skip the pre-flight count.
1276  RelAlgNode const* const prev_body = project->getInput(0);
1277  if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
1278  if (RaExecutionDesc const* const prev_exec_desc =
1279  prev_body->hasContextData()
1280  ? prev_body->getContextData()
1281  : seq.getDescriptorByBodyId(prev_body->getId(), step_idx - 1)) {
1282  const auto& prev_exe_result = prev_exec_desc->getResult();
1283  const auto prev_result = prev_exe_result.getRows();
1284  if (prev_result) {
1285  prev_count = prev_result->rowCount();
1286  VLOG(3) << "Setting output row count for projection node to previous node ("
1287  << prev_exec_desc->getBody()->toString(
1289  << ") to " << *prev_count;
1290  }
1291  }
1292  }
1293  }
1294  exec_desc.setResult(executeProject(project,
1295  hint_applied.first,
1296  hint_applied.second,
1297  render_info,
1298  queue_time_ms,
1299  prev_count));
1300  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
1301  << static_cast<int>(-project->getId()) << ", ...)"
1302  << " exec_desc.getResult().getDataPtr()->rowCount()="
1303  << exec_desc.getResult().getDataPtr()->rowCount();
1304  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1305  return;
1306  }
1307  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
1308  }
1309  return;
1310  }
1311  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
1312  if (aggregate) {
1313  exec_desc.setResult(executeAggregate(
1314  aggregate, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1315  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
1316  return;
1317  }
1318  const auto filter = dynamic_cast<const RelFilter*>(body);
1319  if (filter) {
1320  exec_desc.setResult(executeFilter(
1321  filter, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1322  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
1323  return;
1324  }
1325  const auto sort = dynamic_cast<const RelSort*>(body);
1326  if (sort) {
1327  exec_desc.setResult(executeSort(
1328  sort, hint_applied.first, hint_applied.second, render_info, queue_time_ms));
1329  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1330  return;
1331  }
1332  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
1333  return;
1334  }
1335  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
1336  if (logical_values) {
1337  exec_desc.setResult(executeLogicalValues(logical_values, hint_applied.second));
1338  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
1339  return;
1340  }
1341  const auto modify = dynamic_cast<const RelModify*>(body);
1342  if (modify) {
1343  exec_desc.setResult(executeModify(modify, hint_applied.second));
1344  return;
1345  }
1346  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
1347  if (logical_union) {
1348  exec_desc.setResult(executeUnion(logical_union,
1349  seq,
1350  hint_applied.first,
1351  hint_applied.second,
1352  render_info,
1353  queue_time_ms));
1354  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
1355  return;
1356  }
1357  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
1358  if (table_func) {
1359  exec_desc.setResult(executeTableFunction(
1360  table_func, hint_applied.first, hint_applied.second, queue_time_ms));
1361  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
1362  return;
1363  }
1364  LOG(FATAL) << "Unhandled body type: "
1365  << body->toString(RelRexToStringConfig::defaults());
1366 }
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
bool g_use_query_resultset_cache
Definition: Execute.cpp:148
bool has_valid_query_plan_dag(const RelAlgNode *node)
const bool hasQueryStepForUnion() const
bool g_skip_intermediate_count
#define LOG(tag)
Definition: Logger.h:285
std::vector< size_t > outer_fragment_indices
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
std::optional< RegisteredQueryHint > getParsedQueryHint(const RelAlgNode *node)
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool g_enable_data_recycler
Definition: Execute.cpp:146
void handleNop(RaExecutionDesc &ed)
unsigned getId() const
Definition: RelAlgDag.h:880
#define INJECT_TIMER(DESC)
Definition: measure.h:93
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:892
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutorDeviceType device_type
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
const RaExecutionDesc * getContextData() const
Definition: RelAlgDag.h:888
ExecutionResult executeUnion(const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:49
bool canUseResultsetCache(const ExecutionOptions &eo, RenderInfo *render_info) const
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
bool optimize_cuda_block_and_grid_sizes
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool g_cluster
unsigned dynamic_watchdog_time_limit
void executeDelete(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptorByBodyId(unsigned const body_id, size_t const start_idx) const
Executor * executor_
bool hasContextData() const
Definition: RelAlgDag.h:886
#define VLOG(n)
Definition: Logger.h:387
void setHasStepForUnion(bool flag)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgSubSeq ( const RaExecutionSequence seq,
const std::pair< size_t, size_t >  interval,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)

Definition at line 1026 of file RelAlgExecutor.cpp.

References CHECK, CompilationOptions::device_type, executeRelAlgStep(), executor_, RenderInfo::forceNonInSitu(), g_allow_query_step_cpu_retry, g_cluster, RaExecutionSequence::getDescriptor(), GPU, logger::INFO, INJECT_TIMER, left_deep_join_info_, LOG, CompilationOptions::makeCpuOnly(), now_, gpu_enabled::swap(), and temporary_tables_.

Referenced by executeRelAlgQuerySingleStep().

1032  {
1034  executor_->temporary_tables_ = &temporary_tables_;
1036  time(&now_);
1037  for (size_t i = interval.first; i < interval.second; i++) {
1038  // only render on the last step
1039  try {
1040  executeRelAlgStep(seq,
1041  i,
1042  co,
1043  eo,
1044  (i == interval.second - 1) ? render_info : nullptr,
1045  queue_time_ms);
1046  } catch (const QueryMustRunOnCpu&) {
1047  // Do not allow per-step retry if flag is off or in distributed mode
1048  // TODO(todd): Determine if and when we can relax this restriction
1049  // for distributed
1052  throw;
1053  }
1054  LOG(INFO) << "Retrying current query step " << i << " on CPU";
1055  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
1056  if (render_info && i == interval.second - 1) {
1057  render_info->forceNonInSitu();
1058  }
1059  executeRelAlgStep(seq,
1060  i,
1061  co_cpu,
1062  eo,
1063  (i == interval.second - 1) ? render_info : nullptr,
1064  queue_time_ms);
1065  }
1066  }
1067 
1068  return seq.getDescriptor(interval.second - 1)->getResult();
1069 }
RaExecutionDesc * getDescriptor(size_t idx) const
void forceNonInSitu()
Definition: RenderInfo.cpp:45
#define LOG(tag)
Definition: Logger.h:285
TemporaryTables temporary_tables_
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutorDeviceType device_type
std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_join_info_
#define CHECK(condition)
Definition: Logger.h:291
bool g_allow_query_step_cpu_retry
Definition: Execute.cpp:87
bool g_cluster
Executor * executor_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeSimpleInsert ( const Analyzer::Query insert_query,
Fragmenter_Namespace::InsertDataLoader inserter,
const Catalog_Namespace::SessionInfo session 
)

Definition at line 2906 of file RelAlgExecutor.cpp.

References append_datum(), DataBlockPtr::arraysPtr, CHECK, CHECK_EQ, checked_malloc(), Fragmenter_Namespace::InsertData::columnIds, ColumnDescriptor::columnType, CPU, Fragmenter_Namespace::InsertData::data, Fragmenter_Namespace::InsertData::databaseId, executor_, import_export::fill_missing_columns(), get_column_descriptor(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_elem_type(), Analyzer::Query::get_result_col_list(), Analyzer::Query::get_result_table_id(), SQLTypeInfo::get_size(), SQLTypeInfo::get_type(), Analyzer::Query::get_values_lists(), Catalog_Namespace::SessionInfo::getCatalog(), Fragmenter_Namespace::InsertDataLoader::getLeafCount(), Catalog_Namespace::Catalog::getMetadataForTable(), inline_fixed_encoding_null_val(), anonymous_namespace{RelAlgExecutor.cpp}::insert_one_dict_str(), Fragmenter_Namespace::InsertDataLoader::insertData(), CacheInvalidator< CACHE_HOLDING_TYPES >::invalidateCachesByTable(), is_null(), SQLTypeInfo::is_string(), kARRAY, kBIGINT, kBOOLEAN, kCAST, kCHAR, kDATE, kDECIMAL, kDOUBLE, kENCODING_DICT, kENCODING_NONE, kFLOAT, kINT, kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kNUMERIC, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, DataBlockPtr::numbersPtr, Fragmenter_Namespace::InsertData::numRows, anonymous_namespace{TypedDataAccessors.h}::put_null(), anonymous_namespace{TypedDataAccessors.h}::put_null_array(), DataBlockPtr::stringsPtr, Fragmenter_Namespace::InsertData::tableId, and to_string().

2909  {
2910  // Note: We currently obtain an executor for this method, but we do not need it.
2911  // Therefore, we skip the executor state setup in the regular execution path. In the
2912  // future, we will likely want to use the executor to evaluate expressions in the insert
2913  // statement.
2914 
2915  const auto& values_lists = query.get_values_lists();
2916  const int table_id = query.get_result_table_id();
2917  const auto& col_id_list = query.get_result_col_list();
2918  size_t rows_number = values_lists.size();
2919  size_t leaf_count = inserter.getLeafCount();
2920  const auto& catalog = session.getCatalog();
2921  const auto td = catalog.getMetadataForTable(table_id);
2922  CHECK(td);
2923  size_t rows_per_leaf = rows_number;
2924  if (td->nShards == 0) {
2925  rows_per_leaf =
2926  ceil(static_cast<double>(rows_number) / static_cast<double>(leaf_count));
2927  }
2928  auto max_number_of_rows_per_package =
2929  std::max(size_t(1), std::min(rows_per_leaf, size_t(64 * 1024)));
2930 
2931  std::vector<const ColumnDescriptor*> col_descriptors;
2932  std::vector<int> col_ids;
2933  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2934  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2935  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2936  std::unordered_map<int, int> sequential_ids;
2937 
2938  for (const int col_id : col_id_list) {
2939  const auto cd = get_column_descriptor({catalog.getDatabaseId(), table_id, col_id});
2940  const auto col_enc = cd->columnType.get_compression();
2941  if (cd->columnType.is_string()) {
2942  switch (col_enc) {
2943  case kENCODING_NONE: {
2944  auto it_ok =
2945  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2946  CHECK(it_ok.second);
2947  break;
2948  }
2949  case kENCODING_DICT: {
2950  const auto dd = catalog.getMetadataForDict(cd->columnType.get_comp_param());
2951  CHECK(dd);
2952  const auto it_ok = col_buffers.emplace(
2953  col_id,
2954  std::make_unique<uint8_t[]>(cd->columnType.get_size() *
2955  max_number_of_rows_per_package));
2956  CHECK(it_ok.second);
2957  break;
2958  }
2959  default:
2960  CHECK(false);
2961  }
2962  } else if (cd->columnType.is_geometry()) {
2963  auto it_ok =
2964  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2965  CHECK(it_ok.second);
2966  } else if (cd->columnType.is_array()) {
2967  auto it_ok =
2968  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2969  CHECK(it_ok.second);
2970  } else {
2971  const auto it_ok = col_buffers.emplace(
2972  col_id,
2973  std::unique_ptr<uint8_t[]>(new uint8_t[cd->columnType.get_logical_size() *
2974  max_number_of_rows_per_package]()));
2975  CHECK(it_ok.second);
2976  }
2977  col_descriptors.push_back(cd);
2978  sequential_ids[col_id] = col_ids.size();
2979  col_ids.push_back(col_id);
2980  }
2981 
2982  // mark the target table's cached item as dirty
2983  std::vector<int> table_chunk_key_prefix{catalog.getCurrentDB().dbId, table_id};
2984  auto table_key = boost::hash_value(table_chunk_key_prefix);
2987 
2988  size_t start_row = 0;
2989  size_t rows_left = rows_number;
2990  while (rows_left != 0) {
2991  // clear the buffers
2992  for (const auto& kv : col_buffers) {
2993  memset(kv.second.get(), 0, max_number_of_rows_per_package);
2994  }
2995  for (auto& kv : str_col_buffers) {
2996  kv.second.clear();
2997  }
2998  for (auto& kv : arr_col_buffers) {
2999  kv.second.clear();
3000  }
3001 
3002  auto package_size = std::min(rows_left, max_number_of_rows_per_package);
3003  // Note: if there will be use cases with batch inserts with lots of rows, it might be
3004  // more efficient to do the loops below column by column instead of row by row.
3005  // But for now I consider such a refactoring not worth investigating, as we have more
3006  // efficient ways to insert many rows anyway.
3007  for (size_t row_idx = 0; row_idx < package_size; ++row_idx) {
3008  const auto& values_list = values_lists[row_idx + start_row];
3009  for (size_t col_idx = 0; col_idx < col_descriptors.size(); ++col_idx) {
3010  CHECK(values_list.size() == col_descriptors.size());
3011  auto col_cv =
3012  dynamic_cast<const Analyzer::Constant*>(values_list[col_idx]->get_expr());
3013  if (!col_cv) {
3014  auto col_cast =
3015  dynamic_cast<const Analyzer::UOper*>(values_list[col_idx]->get_expr());
3016  CHECK(col_cast);
3017  CHECK_EQ(kCAST, col_cast->get_optype());
3018  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
3019  }
3020  CHECK(col_cv);
3021  const auto cd = col_descriptors[col_idx];
3022  auto col_datum = col_cv->get_constval();
3023  auto col_type = cd->columnType.get_type();
3024  uint8_t* col_data_bytes{nullptr};
3025  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
3026  (!cd->columnType.is_string() ||
3027  cd->columnType.get_compression() == kENCODING_DICT)) {
3028  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
3029  CHECK(col_data_bytes_it != col_buffers.end());
3030  col_data_bytes = col_data_bytes_it->second.get();
3031  }
3032  switch (col_type) {
3033  case kBOOLEAN: {
3034  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
3035  auto null_bool_val =
3036  col_datum.boolval == inline_fixed_encoding_null_val(cd->columnType);
3037  col_data[row_idx] = col_cv->get_is_null() || null_bool_val
3038  ? inline_fixed_encoding_null_val(cd->columnType)
3039  : (col_datum.boolval ? 1 : 0);
3040  break;
3041  }
3042  case kTINYINT: {
3043  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
3044  col_data[row_idx] = col_cv->get_is_null()
3045  ? inline_fixed_encoding_null_val(cd->columnType)
3046  : col_datum.tinyintval;
3047  break;
3048  }
3049  case kSMALLINT: {
3050  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
3051  col_data[row_idx] = col_cv->get_is_null()
3052  ? inline_fixed_encoding_null_val(cd->columnType)
3053  : col_datum.smallintval;
3054  break;
3055  }
3056  case kINT: {
3057  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
3058  col_data[row_idx] = col_cv->get_is_null()
3059  ? inline_fixed_encoding_null_val(cd->columnType)
3060  : col_datum.intval;
3061  break;
3062  }
3063  case kBIGINT:
3064  case kDECIMAL:
3065  case kNUMERIC: {
3066  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
3067  col_data[row_idx] = col_cv->get_is_null()
3068  ? inline_fixed_encoding_null_val(cd->columnType)
3069  : col_datum.bigintval;
3070  break;
3071  }
3072  case kFLOAT: {
3073  auto col_data = reinterpret_cast<float*>(col_data_bytes);
3074  col_data[row_idx] = col_datum.floatval;
3075  break;
3076  }
3077  case kDOUBLE: {
3078  auto col_data = reinterpret_cast<double*>(col_data_bytes);
3079  col_data[row_idx] = col_datum.doubleval;
3080  break;
3081  }
3082  case kTEXT:
3083  case kVARCHAR:
3084  case kCHAR: {
3085  switch (cd->columnType.get_compression()) {
3086  case kENCODING_NONE:
3087  str_col_buffers[col_ids[col_idx]].push_back(
3088  col_datum.stringval ? *col_datum.stringval : "");
3089  break;
3090  case kENCODING_DICT: {
3091  switch (cd->columnType.get_size()) {
3092  case 1:
3094  &reinterpret_cast<uint8_t*>(col_data_bytes)[row_idx],
3095  cd,
3096  col_cv,
3097  catalog);
3098  break;
3099  case 2:
3101  &reinterpret_cast<uint16_t*>(col_data_bytes)[row_idx],
3102  cd,
3103  col_cv,
3104  catalog);
3105  break;
3106  case 4:
3108  &reinterpret_cast<int32_t*>(col_data_bytes)[row_idx],
3109  cd,
3110  col_cv,
3111  catalog);
3112  break;
3113  default:
3114  CHECK(false);
3115  }
3116  break;
3117  }
3118  default:
3119  CHECK(false);
3120  }
3121  break;
3122  }
3123  case kTIME:
3124  case kTIMESTAMP:
3125  case kDATE: {
3126  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
3127  col_data[row_idx] = col_cv->get_is_null()
3128  ? inline_fixed_encoding_null_val(cd->columnType)
3129  : col_datum.bigintval;
3130  break;
3131  }
3132  case kARRAY: {
3133  const auto is_null = col_cv->get_is_null();
3134  const auto size = cd->columnType.get_size();
3135  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
3136  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
3137  const auto is_point_coords =
3138  (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
3139  if (is_null && !is_point_coords) {
3140  if (size > 0) {
3141  int8_t* buf = (int8_t*)checked_malloc(size);
3142  put_null_array(static_cast<void*>(buf), elem_ti, "");
3143  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
3144  p += elem_ti.get_size()) {
3145  put_null(static_cast<void*>(p), elem_ti, "");
3146  }
3147  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
3148  } else {
3149  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
3150  }
3151  break;
3152  }
3153  const auto l = col_cv->get_value_list();
3154  size_t len = l.size() * elem_ti.get_size();
3155  if (size > 0 && static_cast<size_t>(size) != len) {
3156  throw std::runtime_error("Array column " + cd->columnName + " expects " +
3157  std::to_string(size / elem_ti.get_size()) +
3158  " values, " + "received " +
3159  std::to_string(l.size()));
3160  }
3161  if (elem_ti.is_string()) {
3162  CHECK(kENCODING_DICT == elem_ti.get_compression());
3163  CHECK(4 == elem_ti.get_size());
3164 
3165  int8_t* buf = (int8_t*)checked_malloc(len);
3166  int32_t* p = reinterpret_cast<int32_t*>(buf);
3167 
3168  int elemIndex = 0;
3169  for (auto& e : l) {
3170  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3171  CHECK(c);
3173  &p[elemIndex], cd->columnName, elem_ti, c.get(), catalog);
3174  elemIndex++;
3175  }
3176  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3177 
3178  } else {
3179  int8_t* buf = (int8_t*)checked_malloc(len);
3180  int8_t* p = buf;
3181  for (auto& e : l) {
3182  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3183  CHECK(c);
3184  p = append_datum(p, c->get_constval(), elem_ti);
3185  CHECK(p);
3186  }
3187  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3188  }
3189  break;
3190  }
3191  case kPOINT:
3192  case kMULTIPOINT:
3193  case kLINESTRING:
3194  case kMULTILINESTRING:
3195  case kPOLYGON:
3196  case kMULTIPOLYGON:
3197  str_col_buffers[col_ids[col_idx]].push_back(
3198  col_datum.stringval ? *col_datum.stringval : "");
3199  break;
3200  default:
3201  CHECK(false);
3202  }
3203  }
3204  }
3205  start_row += package_size;
3206  rows_left -= package_size;
3207 
3209  insert_data.databaseId = catalog.getCurrentDB().dbId;
3210  insert_data.tableId = table_id;
3211  insert_data.data.resize(col_ids.size());
3212  insert_data.columnIds = col_ids;
3213  for (const auto& kv : col_buffers) {
3214  DataBlockPtr p;
3215  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
3216  insert_data.data[sequential_ids[kv.first]] = p;
3217  }
3218  for (auto& kv : str_col_buffers) {
3219  DataBlockPtr p;
3220  p.stringsPtr = &kv.second;
3221  insert_data.data[sequential_ids[kv.first]] = p;
3222  }
3223  for (auto& kv : arr_col_buffers) {
3224  DataBlockPtr p;
3225  p.arraysPtr = &kv.second;
3226  insert_data.data[sequential_ids[kv.first]] = p;
3227  }
3228  insert_data.numRows = package_size;
3229  auto data_memory_holder = import_export::fill_missing_columns(&catalog, insert_data);
3230  inserter.insertData(session, insert_data);
3231  }
3232 
3233  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
3236  executor_->getRowSetMemoryOwner(),
3237  0,
3238  0);
3239  std::vector<TargetMetaInfo> empty_targets;
3240  return {rs, empty_targets};
3241 }
static void invalidateCachesByTable(size_t table_key)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
HOST DEVICE int get_size() const
Definition: sqltypes.h:393
int8_t * append_datum(int8_t *buf, const Datum &d, const SQLTypeInfo &ti)
Definition: Datum.cpp:578
Definition: sqltypes.h:66
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:224
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:225
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:6230
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
Definition: sqldefs.h:48
std::vector< TargetInfo > TargetInfoList
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:381
void insertData(const Catalog_Namespace::SessionInfo &session_info, InsertData &insert_data)
std::string to_string(char const *&&v)
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:219
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
CONSTEXPR DEVICE bool is_null(const T &value)
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
const ColumnDescriptor * get_column_descriptor(const shared::ColumnKey &column_key)
Definition: Execute.h:192
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
Definition: sqltypes.h:69
Definition: sqltypes.h:70
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:389
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:73
Catalog & getCatalog() const
Definition: SessionInfo.h:75
Definition: sqltypes.h:58
#define CHECK(condition)
Definition: Logger.h:291
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
Definition: sqltypes.h:62
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_string() const
Definition: sqltypes.h:580
int8_t * numbersPtr
Definition: sqltypes.h:223
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:963
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
Executor * executor_

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeSort ( const RelSort sort,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 3260 of file RelAlgExecutor.cpp.

References SpeculativeTopNBlacklist::add(), addTemporaryTable(), anonymous_namespace{RelAlgExecutor.cpp}::build_render_targets(), canUseResultsetCache(), CHECK, CHECK_EQ, anonymous_namespace{RelAlgExecutor.cpp}::check_sort_node_source_constraint(), RelSort::collationCount(), createSortInputWorkUnit(), DEBUG_TIMER, executeWorkUnit(), executor_, anonymous_namespace{RelAlgExecutor.cpp}::first_oe_is_desc(), g_cluster, anonymous_namespace{RelAlgExecutor.cpp}::get_order_entries(), ExecutionResult::getDataPtr(), RelAlgNode::getId(), RelAlgNode::getInput(), RelSort::getLimit(), RelSort::getOffset(), GPU, anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), RelSort::isEmptyResult(), leaf_results_, anonymous_namespace{RelAlgExecutor.cpp}::node_is_aggregate(), ExecutionOptions::output_columnar_hint, run_benchmark_import::result, RelAlgNode::setOutputMetainfo(), gpu_enabled::sort(), speculative_topn_blacklist_, temporary_tables_, use_speculative_top_n(), and VLOG.

Referenced by executeRelAlgStep().

3264  {
3265  auto timer = DEBUG_TIMER(__func__);
3267  const auto source = sort->getInput(0);
3268  const bool is_aggregate = node_is_aggregate(source);
3269  auto it = leaf_results_.find(sort->getId());
3270  auto order_entries = get_order_entries(sort);
3271  if (it != leaf_results_.end()) {
3272  // Add any transient string literals to the sdp on the agg
3273  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3274  executor_->addTransientStringLiterals(source_work_unit.exe_unit,
3275  executor_->row_set_mem_owner_);
3276  // Handle push-down for LIMIT for multi-node
3277  auto& aggregated_result = it->second;
3278  auto& result_rows = aggregated_result.rs;
3279  const size_t limit = sort->getLimit();
3280  const size_t offset = sort->getOffset();
3281  if (limit || offset) {
3282  if (!order_entries.empty()) {
3283  result_rows->sort(order_entries, limit + offset, executor_);
3284  }
3285  result_rows->dropFirstN(offset);
3286  if (limit) {
3287  result_rows->keepFirstN(limit);
3288  }
3289  }
3290 
3291  if (render_info) {
3292  // We've hit a sort step that is the very last step
3293  // in a distributed render query. We'll fill in the render targets
3294  // since we have all that data needed to do so. This is normally
3295  // done in executeWorkUnit, but that is bypassed in this case.
3296  build_render_targets(*render_info,
3297  source_work_unit.exe_unit.target_exprs,
3298  aggregated_result.targets_meta);
3299  }
3300 
3301  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3302  sort->setOutputMetainfo(aggregated_result.targets_meta);
3303 
3304  return result;
3305  }
3306 
3307  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
3308  bool is_desc{false};
3309  bool use_speculative_top_n_sort{false};
3310 
3311  auto execute_sort_query = [this,
3312  sort,
3313  &source,
3314  &is_aggregate,
3315  &eo,
3316  &co,
3317  render_info,
3318  queue_time_ms,
3319  &groupby_exprs,
3320  &is_desc,
3321  &order_entries,
3322  &use_speculative_top_n_sort]() -> ExecutionResult {
3323  const size_t limit = sort->getLimit();
3324  const size_t offset = sort->getOffset();
3325  // check whether sort's input is cached
3326  auto source_node = sort->getInput(0);
3327  CHECK(source_node);
3328  ExecutionResult source_result{nullptr, {}};
3329  auto source_query_plan_dag = source_node->getQueryPlanDagHash();
3330  bool enable_resultset_recycler = canUseResultsetCache(eo, render_info);
3331  if (enable_resultset_recycler && has_valid_query_plan_dag(source_node) &&
3332  !sort->isEmptyResult()) {
3333  if (auto cached_resultset =
3334  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
3335  source_query_plan_dag)) {
3336  CHECK(cached_resultset->canUseSpeculativeTopNSort());
3337  VLOG(1) << "recycle resultset of the root node " << source_node->getRelNodeDagId()
3338  << " from resultset cache";
3339  source_result =
3340  ExecutionResult{cached_resultset, cached_resultset->getTargetMetaInfo()};
3341  if (temporary_tables_.find(-source_node->getId()) == temporary_tables_.end()) {
3342  addTemporaryTable(-source_node->getId(), cached_resultset);
3343  }
3344  use_speculative_top_n_sort = *cached_resultset->canUseSpeculativeTopNSort() &&
3345  co.device_type == ExecutorDeviceType::GPU;
3346  source_node->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
3347  sort->setOutputMetainfo(source_node->getOutputMetainfo());
3348  }
3349  }
3350  if (!source_result.getDataPtr()) {
3351  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3352  is_desc = first_oe_is_desc(order_entries);
3353  ExecutionOptions eo_copy = {
3355  eo.keep_result,
3356  eo.allow_multifrag,
3357  eo.just_explain,
3358  eo.allow_loop_joins,
3359  eo.with_watchdog,
3360  eo.jit_debug,
3361  eo.just_validate || sort->isEmptyResult(),
3362  eo.with_dynamic_watchdog,
3363  eo.dynamic_watchdog_time_limit,
3364  eo.find_push_down_candidates,
3365  eo.just_calcite_explain,
3366  eo.gpu_input_mem_limit_percent,
3367  eo.allow_runtime_query_interrupt,
3368  eo.running_query_interrupt_freq,
3369  eo.pending_query_interrupt_freq,
3370  eo.optimize_cuda_block_and_grid_sizes,
3371  eo.max_join_hash_table_size,
3372  eo.executor_type,
3373  };
3374 
3375  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
3376  source_result = executeWorkUnit(source_work_unit,
3377  source->getOutputMetainfo(),
3378  is_aggregate,
3379  co,
3380  eo_copy,
3381  render_info,
3382  queue_time_ms);
3383  use_speculative_top_n_sort =
3384  source_result.getDataPtr() && source_result.getRows()->hasValidBuffer() &&
3385  use_speculative_top_n(source_work_unit.exe_unit,
3386  source_result.getRows()->getQueryMemDesc());
3387  }
3388  if (render_info && render_info->isInSitu()) {
3389  return source_result;
3390  }
3391  if (source_result.isFilterPushDownEnabled()) {
3392  return source_result;
3393  }
3394  auto rows_to_sort = source_result.getRows();
3395  if (eo.just_explain) {
3396  return {rows_to_sort, {}};
3397  }
3398  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
3399  !use_speculative_top_n_sort) {
3400  const size_t top_n = limit == 0 ? 0 : limit + offset;
3401  rows_to_sort->sort(order_entries, top_n, executor_);
3402  }
3403  if (limit || offset) {
3404  if (g_cluster && sort->collationCount() == 0) {
3405  if (offset >= rows_to_sort->rowCount()) {
3406  rows_to_sort->dropFirstN(offset);
3407  } else {
3408  rows_to_sort->keepFirstN(limit + offset);
3409  }
3410  } else {
3411  rows_to_sort->dropFirstN(offset);
3412  if (limit) {
3413  rows_to_sort->keepFirstN(limit);
3414  }
3415  }
3416  }
3417  return {rows_to_sort, source_result.getTargetsMeta()};
3418  };
3419 
3420  try {
3421  return execute_sort_query();
3422  } catch (const SpeculativeTopNFailed& e) {
3423  CHECK_EQ(size_t(1), groupby_exprs.size());
3424  CHECK(groupby_exprs.front());
3425  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
3426  return execute_sort_query();
3427  }
3428 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:861
#define CHECK_EQ(x, y)
Definition: Logger.h:301
size_t getOffset() const
Definition: RelAlgDag.h:1971
bool has_valid_query_plan_dag(const RelAlgNode *node)
static SpeculativeTopNBlacklist speculative_topn_blacklist_
TemporaryTables temporary_tables_
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
const ResultSetPtr & getDataPtr() const
unsigned getId() const
Definition: RelAlgDag.h:880
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:892
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
bool node_is_aggregate(const RelAlgNode *ra)
bool isEmptyResult() const
Definition: RelAlgDag.h:1965
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
size_t collationCount() const
Definition: RelAlgDag.h:1952
size_t getLimit() const
Definition: RelAlgDag.h:1969
bool canUseResultsetCache(const ExecutionOptions &eo, RenderInfo *render_info) const
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
bool g_cluster
void add(const std::shared_ptr< Analyzer::Expr > expr, const bool desc)
Executor * executor_
#define VLOG(n)
Definition: Logger.h:387
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)
WorkUnit createSortInputWorkUnit(const RelSort *, std::list< Analyzer::OrderEntry > &order_entries, const ExecutionOptions &eo)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeTableFunction ( const RelTableFunction table_func,
const CompilationOptions co_in,
const ExecutionOptions eo,
const int64_t  queue_time_ms 
)
private

Definition at line 2402 of file RelAlgExecutor.cpp.

References addTemporaryTable(), canUseResultsetCache(), CHECK, createTableFunctionWorkUnit(), DEBUG_TIMER, Executor::ERR_OUT_OF_GPU_MEM, executor_, g_allow_auto_resultset_caching, g_auto_resultset_caching_threshold, g_cluster, g_enable_table_functions, get_table_infos(), QueryExecutionError::getErrorCode(), getGlobalQueryHint(), RelAlgNode::getQueryPlanDagHash(), RelAlgNode::getRelNodeDagId(), ScanNodeTableKeyCollector::getScanNodeTableKey(), GPU, handlePersistentError(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), hasStepForUnion(), INJECT_TIMER, anonymous_namespace{RelAlgExecutor.cpp}::is_validate_or_explain_query(), ExecutionOptions::just_explain, ExecutionOptions::keep_result, kKeepTableFuncResult, run_benchmark_import::result, target_exprs_owned_, timer_start(), timer_stop(), and VLOG.

Referenced by executeRelAlgStep().

2405  {
2407  auto timer = DEBUG_TIMER(__func__);
2408 
2409  auto co = co_in;
2410 
2411  if (g_cluster) {
2412  throw std::runtime_error("Table functions not supported in distributed mode yet");
2413  }
2414  if (!g_enable_table_functions) {
2415  throw std::runtime_error("Table function support is disabled");
2416  }
2417  auto table_func_work_unit = createTableFunctionWorkUnit(
2418  table_func,
2419  eo.just_explain,
2420  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
2421  const auto body = table_func_work_unit.body;
2422  CHECK(body);
2423 
2424  const auto table_infos =
2425  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
2426 
2427  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2428  co.device_type,
2430  nullptr,
2431  executor_->blockSize(),
2432  executor_->gridSize()),
2433  {}};
2434 
2435  auto global_hint = getGlobalQueryHint();
2436  auto use_resultset_recycler = canUseResultsetCache(eo, nullptr);
2437  if (use_resultset_recycler && has_valid_query_plan_dag(table_func)) {
2438  auto cached_resultset =
2439  executor_->getRecultSetRecyclerHolder().getCachedQueryResultSet(
2440  table_func->getQueryPlanDagHash());
2441  if (cached_resultset) {
2442  VLOG(1) << "recycle table function's resultset of the root node "
2443  << table_func->getRelNodeDagId() << " from resultset cache";
2444  result = {cached_resultset, cached_resultset->getTargetMetaInfo()};
2445  addTemporaryTable(-body->getId(), result.getDataPtr());
2446  return result;
2447  }
2448  }
2449 
2450  auto query_exec_time_begin = timer_start();
2451  try {
2452  result = {executor_->executeTableFunction(
2453  table_func_work_unit.exe_unit, table_infos, co, eo),
2454  body->getOutputMetainfo()};
2455  } catch (const QueryExecutionError& e) {
2458  throw std::runtime_error("Table function ran out of memory during execution");
2459  }
2460  auto query_exec_time = timer_stop(query_exec_time_begin);
2461  result.setQueueTime(queue_time_ms);
2462  auto resultset_ptr = result.getDataPtr();
2463  auto allow_auto_caching_resultset = resultset_ptr && resultset_ptr->hasValidBuffer() &&
2465  resultset_ptr->getBufferSizeBytes(co.device_type) <=
2467  bool keep_result = global_hint->isHintRegistered(QueryHint::kKeepTableFuncResult);
2468  if (use_resultset_recycler && (keep_result || allow_auto_caching_resultset) &&
2469  !hasStepForUnion()) {
2470  resultset_ptr->setExecTime(query_exec_time);
2471  resultset_ptr->setQueryPlanHash(table_func_work_unit.exe_unit.query_plan_dag_hash);
2472  resultset_ptr->setTargetMetaInfo(body->getOutputMetainfo());
2473  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
2474  resultset_ptr->setInputTableKeys(std::move(input_table_keys));
2475  if (allow_auto_caching_resultset) {
2476  VLOG(1) << "Automatically keep table function's query resultset to recycler";
2477  }
2478  executor_->getRecultSetRecyclerHolder().putQueryResultSetToCache(
2479  table_func_work_unit.exe_unit.query_plan_dag_hash,
2480  resultset_ptr->getInputTableKeys(),
2481  resultset_ptr,
2482  resultset_ptr->getBufferSizeBytes(co.device_type),
2484  } else {
2485  if (eo.keep_result) {
2486  if (g_cluster) {
2487  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since we do not "
2488  "support resultset recycling on distributed mode";
2489  } else if (hasStepForUnion()) {
2490  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2491  "has union-(all) operator";
2492  } else if (is_validate_or_explain_query(eo)) {
2493  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2494  "is either validate or explain query";
2495  } else {
2496  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored";
2497  }
2498  }
2499  }
2500 
2501  return result;
2502 }
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool has_valid_query_plan_dag(const RelAlgNode *node)
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
static std::unordered_set< size_t > getScanNodeTableKey(RelAlgNode const *rel_alg_node)
bool hasStepForUnion() const
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:874
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
#define INJECT_TIMER(DESC)
Definition: measure.h:93
bool g_allow_auto_resultset_caching
Definition: Execute.cpp:150
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1429
std::optional< RegisteredQueryHint > getGlobalQueryHint()
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
bool is_validate_or_explain_query(const ExecutionOptions &eo)
bool canUseResultsetCache(const ExecutionOptions &eo, RenderInfo *render_info) const
static void handlePersistentError(const int32_t error_code)
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:411
bool g_cluster
size_t getRelNodeDagId() const
Definition: RelAlgDag.h:928
Executor * executor_
#define VLOG(n)
Definition: Logger.h:387
Type timer_start()
Definition: measure.h:42
size_t g_auto_resultset_caching_threshold
Definition: Execute.cpp:156
bool g_enable_table_functions
Definition: Execute.cpp:112

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeUnion ( const RelLogicalUnion logical_union,
const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2769 of file RelAlgExecutor.cpp.

References anonymous_namespace{QueryMemoryDescriptor.cpp}::any_of(), createUnionWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelLogicalUnion::getCompatibleMetainfoTypes(), RelAlgNode::getOutputMetainfo(), RelLogicalUnion::isAll(), isGeometry(), CompilationOptions::makeCpuOnly(), and RelAlgNode::setOutputMetainfo().

Referenced by executeRelAlgStep().

2774  {
2775  auto timer = DEBUG_TIMER(__func__);
2776  if (!logical_union->isAll()) {
2777  throw std::runtime_error("UNION without ALL is not supported yet.");
2778  }
2779  // Will throw a std::runtime_error if types don't match.
2780  logical_union->setOutputMetainfo(logical_union->getCompatibleMetainfoTypes());
2781  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2782  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2783  }
2784  auto work_unit =
2785  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
2786  return executeWorkUnit(work_unit,
2787  logical_union->getOutputMetainfo(),
2788  false,
2790  eo,
2791  render_info,
2792  queue_time_ms);
2793 }
bool isAll() const
Definition: RelAlgDag.h:2477
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:861
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool isGeometry(TargetMetaInfo const &target_meta_info)
WorkUnit createUnionWorkUnit(const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
std::vector< TargetMetaInfo > getCompatibleMetainfoTypes() const
Definition: RelAlgDag.cpp:911
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define DEBUG_TIMER(name)
Definition: Logger.h:411
bool any_of(std::vector< Analyzer::Expr * > const &target_exprs)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:876

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeUpdate ( const RelAlgNode node,
const CompilationOptions co,
const ExecutionOptions eo,
const int64_t  queue_time_ms 
)
private

Definition at line 2055 of file RelAlgExecutor.cpp.

References CompilationOptions::allow_lazy_fetch, CHECK, CHECK_EQ, Executor::clearExternalCaches(), computeWindow(), CPU, createCompoundWorkUnit(), createProjectWorkUnit(), DEBUG_TIMER, Default, RelRexToStringConfig::defaults(), dml_transaction_parameters_, executor_, CompilationOptions::filter_on_deleted_column, get_table_infos(), get_temporary_table(), QueryExecutionError::getErrorCode(), getErrorMessageFromCode(), CompilationOptions::hoist_literals, leaf_results_, CompilationOptions::makeCpuOnly(), post_execution_callback_, StorageIOFacility::TransactionParameters::setInputSourceNode(), temporary_tables_, and StorageIOFacility::yieldUpdateCallback().

Referenced by executeRelAlgStep().

2058  {
2059  CHECK(node);
2060  auto timer = DEBUG_TIMER(__func__);
2061 
2062  auto co = co_in;
2063  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
2064  // encoded string updates
2065 
2066  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
2067  auto& work_unit,
2068  const bool is_aggregate) {
2069  auto table_descriptor = node->getModifiedTableDescriptor();
2070  CHECK(table_descriptor);
2071  if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
2072  throw std::runtime_error(
2073  "UPDATE queries involving variable length columns are only supported on tables "
2074  "with the vacuum attribute set to 'delayed'");
2075  }
2076 
2077  auto catalog = node->getModifiedTableCatalog();
2078  CHECK(catalog);
2079  Executor::clearExternalCaches(true, table_descriptor, catalog->getDatabaseId());
2080 
2082  std::make_unique<UpdateTransactionParameters>(table_descriptor,
2083  *catalog,
2084  node->getTargetColumns(),
2085  node->getOutputMetainfo(),
2086  node->isVarlenUpdateRequired());
2087 
2088  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2089 
2090  auto execute_update_ra_exe_unit =
2091  [this, &co, &eo_in, &table_infos, &table_descriptor, &node, catalog](
2092  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
2094 
2095  auto eo = eo_in;
2096  if (dml_transaction_parameters_->tableIsTemporary()) {
2097  eo.output_columnar_hint = true;
2098  co_project.allow_lazy_fetch = false;
2099  co_project.filter_on_deleted_column =
2100  false; // project the entire delete column for columnar update
2101  }
2102 
2103  auto update_transaction_parameters = dynamic_cast<UpdateTransactionParameters*>(
2104  dml_transaction_parameters_.get());
2105  update_transaction_parameters->setInputSourceNode(node);
2106  CHECK(update_transaction_parameters);
2107  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
2108  try {
2109  auto table_update_metadata =
2110  executor_->executeUpdate(ra_exe_unit,
2111  table_infos,
2112  table_descriptor,
2113  co_project,
2114  eo,
2115  *catalog,
2116  executor_->row_set_mem_owner_,
2117  update_callback,
2118  is_aggregate);
2119  post_execution_callback_ = [table_update_metadata, this, catalog]() {
2120  dml_transaction_parameters_->finalizeTransaction(*catalog);
2121  TableOptimizer table_optimizer{
2122  dml_transaction_parameters_->getTableDescriptor(), executor_, *catalog};
2123  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2124  };
2125  } catch (const QueryExecutionError& e) {
2126  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2127  }
2128  };
2129 
2130  if (dml_transaction_parameters_->tableIsTemporary()) {
2131  // hold owned target exprs during execution if rewriting
2132  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos,