OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutor Class Reference

#include <RelAlgExecutor.h>

+ Inheritance diagram for RelAlgExecutor:
+ Collaboration diagram for RelAlgExecutor:

Classes

struct  TableFunctionWorkUnit
 
struct  WorkUnit
 

Public Types

using TargetInfoList = std::vector< TargetInfo >
 

Public Member Functions

 RelAlgExecutor (Executor *executor, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
 RelAlgExecutor (Executor *executor, const std::string &query_ra, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
 RelAlgExecutor (Executor *executor, std::unique_ptr< RelAlgDag > query_dag, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
size_t getOuterFragmentCount (const CompilationOptions &co, const ExecutionOptions &eo)
 
ExecutionResult executeRelAlgQuery (const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, const bool explain_verbose, RenderInfo *render_info)
 
ExecutionResult executeRelAlgQueryWithFilterPushDown (const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
void prepareLeafExecution (const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
 
ExecutionResult executeRelAlgSeq (const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
 
ExecutionResult executeRelAlgSubSeq (const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
QueryStepExecutionResult executeRelAlgQuerySingleStep (const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
 
void addLeafResult (const unsigned id, const AggregatedResult &result)
 
std::unique_ptr< RelAlgDaggetOwnedRelAlgDag ()
 
RelAlgDaggetRelAlgDag ()
 
const RelAlgNodegetRootRelAlgNode () const
 
void prepareForeignTables ()
 
std::shared_ptr< const RelAlgNodegetRootRelAlgNodeShPtr () const
 
std::pair< std::vector
< unsigned >
, std::unordered_map< unsigned,
JoinQualsPerNestingLevel > > 
getJoinInfo (const RelAlgNode *root_node)
 
std::shared_ptr< RelAlgTranslatorgetRelAlgTranslator (const RelAlgNode *root_node)
 
const std::vector
< std::shared_ptr< RexSubQuery > > & 
getSubqueries () const noexcept
 
std::optional
< RegisteredQueryHint
getParsedQueryHint (const RelAlgNode *node)
 
std::optional
< std::unordered_map< size_t,
std::unordered_map< unsigned,
RegisteredQueryHint > > > 
getParsedQueryHints ()
 
std::optional
< RegisteredQueryHint
getGlobalQueryHint ()
 
ExecutionResult executeSimpleInsert (const Analyzer::Query &insert_query, Fragmenter_Namespace::InsertDataLoader &inserter, const Catalog_Namespace::SessionInfo &session)
 
AggregatedColRange computeColRangesCache ()
 
StringDictionaryGenerations computeStringDictionaryGenerations ()
 
TableGenerations computeTableGenerations ()
 
ExecutorgetExecutor () const
 
void cleanupPostExecution ()
 
void executePostExecutionCallback ()
 
RaExecutionSequence getRaExecutionSequence (const RelAlgNode *root_node, Executor *executor)
 
void prepareForeignTable ()
 
std::unordered_set
< shared::TableKey
getPhysicalTableIds () const
 
void prepareForSystemTableExecution (const CompilationOptions &co) const
 

Static Public Member Functions

static std::string getErrorMessageFromCode (const int32_t error_code)
 

Private Member Functions

void initializeParallelismHints ()
 
ExecutionResult executeRelAlgQueryNoRetry (const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, const bool explain_verbose, RenderInfo *render_info)
 
void executeRelAlgStep (const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
void executeUpdate (const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
 
void executeDelete (const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
 
ExecutionResult executeCompound (const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeAggregate (const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
ExecutionResult executeProject (const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
 
ExecutionResult executeTableFunction (const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
 
ExecutionResult executeFilter (const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeSort (const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeLogicalValues (const RelLogicalValues *, const ExecutionOptions &)
 
ExecutionResult executeModify (const RelModify *modify, const ExecutionOptions &eo)
 
ExecutionResult executeUnion (const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
WorkUnit createSortInputWorkUnit (const RelSort *, std::list< Analyzer::OrderEntry > &order_entries, const ExecutionOptions &eo)
 
ExecutionResult executeWorkUnit (const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
 
void computeWindow (const WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
 
std::unique_ptr
< WindowFunctionContext
createWindowFunctionContext (const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, std::unordered_map< QueryPlanHash, std::shared_ptr< HashJoin >> &partition_cache, std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, const WorkUnit &work_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
 
size_t getNDVEstimation (const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
 
bool hasDeletedRowInQuery (std::vector< InputTableInfo > const &) const
 
std::optional< size_t > getFilteredCountAll (const RelAlgExecutionUnit &ra_exe_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
 
FilterSelectivity getFilterSelectivity (const std::vector< std::shared_ptr< Analyzer::Expr >> &filter_expressions, const CompilationOptions &co, const ExecutionOptions &eo)
 
std::vector< PushedDownFilterInfoselectFiltersToBePushedDown (const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
 
bool isRowidLookup (const WorkUnit &work_unit)
 
ExecutionResult handleOutOfMemoryRetry (const RelAlgExecutor::WorkUnit &work_unit, ColumnCacheMap &column_cache, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const QueryExecutionError &e, const int64_t queue_time_ms)
 
WorkUnit createWorkUnit (const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createCompoundWorkUnit (const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createAggregateWorkUnit (const RelAggregate *, const SortInfo &, const bool just_explain)
 
WorkUnit createProjectWorkUnit (const RelProject *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createFilterWorkUnit (const RelFilter *, const SortInfo &, const bool just_explain)
 
WorkUnit createJoinWorkUnit (const RelJoin *, const SortInfo &, const bool just_explain)
 
WorkUnit createUnionWorkUnit (const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
 
TableFunctionWorkUnit createTableFunctionWorkUnit (const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
 
void addTemporaryTable (const int table_id, const ResultSetPtr &result)
 
void eraseFromTemporaryTables (const int table_id)
 
void handleNop (RaExecutionDesc &ed)
 
std::unordered_map< unsigned,
JoinQualsPerNestingLevel > & 
getLeftDeepJoinTreesInfo ()
 
JoinQualsPerNestingLevel translateLeftDeepJoinFilter (const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
 
std::list< std::shared_ptr
< Analyzer::Expr > > 
makeJoinQuals (const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
 
void setHasStepForUnion (bool flag)
 
bool hasStepForUnion () const
 
bool canUseResultsetCache (const ExecutionOptions &eo, RenderInfo *render_info) const
 
void setupCaching (const RelAlgNode *ra)
 
- Private Member Functions inherited from StorageIOFacility
 StorageIOFacility (Executor *executor)
 
StorageIOFacility::UpdateCallback yieldUpdateCallback (UpdateTransactionParameters &update_parameters)
 
StorageIOFacility::UpdateCallback yieldDeleteCallback (DeleteTransactionParameters &delete_parameters)
 

Static Private Member Functions

static void handlePersistentError (const int32_t error_code)
 

Private Attributes

Executorexecutor_
 
std::unique_ptr< RelAlgDagquery_dag_
 
std::shared_ptr< const
query_state::QueryState
query_state_
 
TemporaryTables temporary_tables_
 
time_t now_
 
std::unordered_map< unsigned,
JoinQualsPerNestingLevel
left_deep_join_info_
 
std::vector< std::shared_ptr
< Analyzer::Expr > > 
target_exprs_owned_
 
std::unordered_map< unsigned,
AggregatedResult
leaf_results_
 
int64_t queue_time_ms_
 
bool has_step_for_union_
 
std::unique_ptr
< TransactionParameters
dml_transaction_parameters_
 
std::optional< std::function
< void()> > 
post_execution_callback_
 

Static Private Attributes

static SpeculativeTopNBlacklist speculative_topn_blacklist_
 

Friends

class PendingExecutionClosure
 

Additional Inherited Members

- Private Types inherited from StorageIOFacility
using UpdateCallback = UpdateLogForFragment::Callback
 
using TableDescriptorType = TableDescriptor
 
using DeleteVictimOffsetList = std::vector< uint64_t >
 
using UpdateTargetOffsetList = std::vector< uint64_t >
 
using UpdateTargetTypeList = std::vector< TargetMetaInfo >
 
using UpdateTargetColumnNamesList = std::vector< std::string >
 
using TransactionLog = Fragmenter_Namespace::InsertOrderFragmenter::ModifyTransactionTracker
 
using TransactionLogPtr = std::unique_ptr< TransactionLog >
 
using ColumnValidationFunction = std::function< bool(std::string const &)>
 

Detailed Description

Definition at line 53 of file RelAlgExecutor.h.

Member Typedef Documentation

Definition at line 55 of file RelAlgExecutor.h.

Constructor & Destructor Documentation

RelAlgExecutor::RelAlgExecutor ( Executor executor,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 57 of file RelAlgExecutor.h.

References initializeParallelismHints().

59  : StorageIOFacility(executor)
60  , executor_(executor)
61  , query_state_(std::move(query_state))
62  , now_(0)
63  , queue_time_ms_(0) {
65  }
int64_t queue_time_ms_
std::shared_ptr< const query_state::QueryState > query_state_
void initializeParallelismHints()
StorageIOFacility(Executor *executor)
Executor * executor_

+ Here is the call graph for this function:

RelAlgExecutor::RelAlgExecutor ( Executor executor,
const std::string &  query_ra,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 67 of file RelAlgExecutor.h.

References initializeParallelismHints().

70  : StorageIOFacility(executor)
71  , executor_(executor)
72  , query_dag_(RelAlgDagBuilder::buildDag(query_ra, true))
73  , query_state_(std::move(query_state))
74  , now_(0)
75  , queue_time_ms_(0) {
77  }
int64_t queue_time_ms_
static std::unique_ptr< RelAlgDag > buildDag(const std::string &query_ra, const bool optimize_dag)
Definition: RelAlgDag.cpp:3358
std::unique_ptr< RelAlgDag > query_dag_
std::shared_ptr< const query_state::QueryState > query_state_
void initializeParallelismHints()
StorageIOFacility(Executor *executor)
Executor * executor_

+ Here is the call graph for this function:

RelAlgExecutor::RelAlgExecutor ( Executor executor,
std::unique_ptr< RelAlgDag query_dag,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 79 of file RelAlgExecutor.h.

References initializeParallelismHints().

82  : StorageIOFacility(executor)
83  , executor_(executor)
84  , query_dag_(std::move(query_dag))
85  , query_state_(std::move(query_state))
86  , now_(0)
87  , queue_time_ms_(0) {
89  }
int64_t queue_time_ms_
std::unique_ptr< RelAlgDag > query_dag_
std::shared_ptr< const query_state::QueryState > query_state_
void initializeParallelismHints()
StorageIOFacility(Executor *executor)
Executor * executor_

+ Here is the call graph for this function:

Member Function Documentation

void RelAlgExecutor::addLeafResult ( const unsigned  id,
const AggregatedResult result 
)
inline

Definition at line 130 of file RelAlgExecutor.h.

References CHECK, and leaf_results_.

130  {
131  const auto it_ok = leaf_results_.emplace(id, result);
132  CHECK(it_ok.second);
133  }
std::unordered_map< unsigned, AggregatedResult > leaf_results_
#define CHECK(condition)
Definition: Logger.h:291
void RelAlgExecutor::addTemporaryTable ( const int  table_id,
const ResultSetPtr result 
)
inlineprivate

Definition at line 402 of file RelAlgExecutor.h.

References CHECK, CHECK_LT, and temporary_tables_.

Referenced by executeRelAlgSeq(), executeRelAlgStep(), executeSort(), executeTableFunction(), and handleNop().

402  {
403  CHECK_LT(size_t(0), result->colCount());
404  CHECK_LT(table_id, 0);
405  auto it_ok = temporary_tables_.emplace(table_id, result);
406  CHECK(it_ok.second);
407  }
TemporaryTables temporary_tables_
#define CHECK_LT(x, y)
Definition: Logger.h:303
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the caller graph for this function:

bool RelAlgExecutor::canUseResultsetCache ( const ExecutionOptions eo,
RenderInfo render_info 
) const
private

Definition at line 483 of file RelAlgExecutor.cpp.

References g_cluster, g_enable_data_recycler, g_use_query_resultset_cache, hasStepForUnion(), is_validate_or_explain_query(), heavyai::InSituFlagsOwnerInterface::isInSitu(), and ExecutionOptions::outer_fragment_indices.

Referenced by executeRelAlgStep(), executeSort(), executeTableFunction(), and executeWorkUnit().

484  {
485  auto validate_or_explain_query = is_validate_or_explain_query(eo);
486  auto query_for_partial_outer_frag = !eo.outer_fragment_indices.empty();
488  !validate_or_explain_query && !hasStepForUnion() &&
489  !query_for_partial_outer_frag &&
490  (!render_info || (render_info && !render_info->isInSitu()));
491 }
bool g_use_query_resultset_cache
Definition: Execute.cpp:160
bool is_validate_or_explain_query(const ExecutionOptions &eo)
std::vector< size_t > outer_fragment_indices
bool g_enable_data_recycler
Definition: Execute.cpp:158
bool hasStepForUnion() const
bool g_cluster

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::cleanupPostExecution ( )

Definition at line 776 of file RelAlgExecutor.cpp.

References CHECK, and executor_.

Referenced by executeRelAlgQueryNoRetry(), and getOuterFragmentCount().

776  {
777  CHECK(executor_);
778  executor_->row_set_mem_owner_ = nullptr;
779 }
#define CHECK(condition)
Definition: Logger.h:291
Executor * executor_

+ Here is the caller graph for this function:

AggregatedColRange RelAlgExecutor::computeColRangesCache ( )

Definition at line 756 of file RelAlgExecutor.cpp.

References executor_, anonymous_namespace{RelAlgExecutor.cpp}::get_physical_inputs_with_spi_col_id(), and getRootRelAlgNode().

756  {
757  AggregatedColRange agg_col_range_cache;
758  const auto phys_inputs = get_physical_inputs_with_spi_col_id(&getRootRelAlgNode());
759  return executor_->computeColRangesCache(phys_inputs);
760 }
const RelAlgNode & getRootRelAlgNode() const
std::unordered_set< PhysicalInput > get_physical_inputs_with_spi_col_id(const RelAlgNode *ra)
Executor * executor_

+ Here is the call graph for this function:

StringDictionaryGenerations RelAlgExecutor::computeStringDictionaryGenerations ( )

Definition at line 762 of file RelAlgExecutor.cpp.

References executor_, anonymous_namespace{RelAlgExecutor.cpp}::get_physical_inputs_with_spi_col_id(), and getRootRelAlgNode().

762  {
763  const auto phys_inputs = get_physical_inputs_with_spi_col_id(&getRootRelAlgNode());
764  return executor_->computeStringDictionaryGenerations(phys_inputs);
765 }
const RelAlgNode & getRootRelAlgNode() const
std::unordered_set< PhysicalInput > get_physical_inputs_with_spi_col_id(const RelAlgNode *ra)
Executor * executor_

+ Here is the call graph for this function:

TableGenerations RelAlgExecutor::computeTableGenerations ( )

Definition at line 767 of file RelAlgExecutor.cpp.

References executor_, get_physical_table_inputs(), and getRootRelAlgNode().

767  {
768  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
769  return executor_->computeTableGenerations(phys_table_ids);
770 }
std::unordered_set< shared::TableKey > get_physical_table_inputs(const RelAlgNode *ra)
const RelAlgNode & getRootRelAlgNode() const
Executor * executor_

+ Here is the call graph for this function:

void RelAlgExecutor::computeWindow ( const WorkUnit work_unit,
const CompilationOptions co,
const ExecutionOptions eo,
ColumnCacheMap column_cache_map,
const int64_t  queue_time_ms 
)
private

Definition at line 2492 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, WindowProjectNodeContext::create(), createWindowFunctionContext(), RelAlgExecutor::WorkUnit::exe_unit, executor_, ExecutionOptions::executor_type, Extern, get_table_infos(), Analyzer::WindowFunction::getPartitionKeys(), RelAlgExecutionUnit::input_descs, kBOOLEAN, kBW_EQ, kONE, RelAlgExecutionUnit::target_exprs, and anonymous_namespace{RelAlgExecutor.cpp}::transform_to_inner().

Referenced by executeUpdate(), and executeWorkUnit().

2496  {
2497  auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
2498  CHECK_EQ(query_infos.size(), size_t(1));
2499  if (query_infos.front().info.fragments.size() != 1) {
2500  throw std::runtime_error(
2501  "Only single fragment tables supported for window functions for now");
2502  }
2503  if (eo.executor_type == ::ExecutorType::Extern) {
2504  return;
2505  }
2506  query_infos.push_back(query_infos.front());
2507  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
2508  // a query may hold multiple window functions having the same partition by condition
2509  // then after building the first hash partition we can reuse it for the rest of
2510  // the window functions
2511  // here, a cached partition can be shared via multiple window function contexts as is
2512  // but sorted partition should be copied to reuse since we use it for (intermediate)
2513  // output buffer
2514  // todo (yoonmin) : support recycler for window function computation?
2515  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>> partition_cache;
2516  std::unordered_map<QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>
2517  sorted_partition_cache;
2518  std::unordered_map<QueryPlanHash, size_t> sorted_partition_key_ref_count_map;
2519  std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>>
2520  window_function_context_map;
2521  std::unordered_map<QueryPlanHash, AggregateTreeForWindowFraming> aggregate_tree_map;
2522  for (size_t target_index = 0; target_index < work_unit.exe_unit.target_exprs.size();
2523  ++target_index) {
2524  const auto& target_expr = work_unit.exe_unit.target_exprs[target_index];
2525  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
2526  if (!window_func) {
2527  continue;
2528  }
2529  // Always use baseline layout hash tables for now, make the expression a tuple.
2530  const auto& partition_keys = window_func->getPartitionKeys();
2531  std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2532  if (partition_keys.size() >= 1) {
2533  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2534  if (partition_keys.size() > 1) {
2535  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2536  } else {
2537  CHECK_EQ(partition_keys.size(), size_t(1));
2538  partition_key_tuple = partition_keys.front();
2539  }
2540  // Creates a tautology equality with the partition expression on both sides.
2541  partition_key_cond =
2542  makeExpr<Analyzer::BinOper>(kBOOLEAN,
2543  kBW_EQ,
2544  kONE,
2545  partition_key_tuple,
2546  transform_to_inner(partition_key_tuple.get()));
2547  }
2548  auto context =
2549  createWindowFunctionContext(window_func,
2550  partition_key_cond /*nullptr if no partition key*/,
2551  partition_cache,
2552  sorted_partition_key_ref_count_map,
2553  work_unit,
2554  query_infos,
2555  co,
2556  column_cache_map,
2557  executor_->getRowSetMemoryOwner());
2558  CHECK(window_function_context_map.emplace(target_index, std::move(context)).second);
2559  }
2560 
2561  for (auto& kv : window_function_context_map) {
2562  kv.second->compute(
2563  sorted_partition_key_ref_count_map, sorted_partition_cache, aggregate_tree_map);
2564  window_project_node_context->addWindowFunctionContext(std::move(kv.second), kv.first);
2565  }
2566 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
static WindowProjectNodeContext * create(Executor *executor)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, std::unordered_map< QueryPlanHash, std::shared_ptr< HashJoin >> &partition_cache, std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, const WorkUnit &work_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
ExecutorType executor_type
Definition: sqldefs.h:74
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqldefs.h:33
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:2929
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createAggregateWorkUnit ( const RelAggregate aggregate,
const SortInfo sort_info,
const bool  just_explain 
)
private

Definition at line 4849 of file RelAlgExecutor.cpp.

References QueryPlanDagExtractor::applyLimitClauseToCacheKey(), CHECK_EQ, RegisteredQueryHint::defaults(), executor_, QueryPlanDagExtractor::extractJoinInfo(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), getLeftDeepJoinTreesInfo(), RelAlgNode::getOutputMetainfo(), RelAlgNode::getQueryPlanDagHash(), RelAlgNode::inputCount(), now_, query_dag_, query_state_, RelAlgNode::setOutputMetainfo(), anonymous_namespace{RelAlgExecutor.cpp}::synthesize_inputs(), target_exprs_owned_, anonymous_namespace{RelAlgExecutor.cpp}::translate_groupby_exprs(), and anonymous_namespace{RelAlgExecutor.cpp}::translate_targets().

Referenced by createWorkUnit(), and executeAggregate().

4852  {
4853  std::vector<InputDescriptor> input_descs;
4854  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4855  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4856  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
4857  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4858  get_input_desc(aggregate, input_to_nest_level, {});
4859  const auto join_type = get_join_type(aggregate);
4860 
4861  RelAlgTranslator translator(
4862  query_state_, executor_, input_to_nest_level, {join_type}, now_, just_explain);
4863  CHECK_EQ(size_t(1), aggregate->inputCount());
4864  const auto source = aggregate->getInput(0);
4865  const auto& in_metainfo = source->getOutputMetainfo();
4866  const auto scalar_sources =
4867  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
4868  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
4869  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4870  const auto target_exprs = translate_targets(target_exprs_owned_,
4871  target_exprs_type_infos,
4872  scalar_sources,
4873  groupby_exprs,
4874  aggregate,
4875  translator);
4876 
4877  const auto query_infos = get_table_infos(input_descs, executor_);
4878 
4879  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
4880  aggregate->setOutputMetainfo(targets_meta);
4881  auto query_hint = RegisteredQueryHint::defaults();
4882  if (query_dag_) {
4883  auto candidate = query_dag_->getQueryHint(aggregate);
4884  if (candidate) {
4885  query_hint = *candidate;
4886  }
4887  }
4888  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4889  aggregate, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
4890  return {RelAlgExecutionUnit{input_descs,
4891  input_col_descs,
4892  {},
4893  {},
4894  {},
4895  groupby_exprs,
4896  target_exprs,
4897  target_exprs_type_infos,
4898  nullptr,
4899  sort_info,
4900  0,
4901  query_hint,
4903  aggregate->getQueryPlanDagHash(), sort_info),
4904  join_info.hash_table_plan_dag,
4905  join_info.table_id_to_node_map,
4906  false,
4907  std::nullopt,
4908  query_state_},
4909  aggregate,
4911  nullptr};
4912 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:863
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, std::unordered_map< size_t, SQLTypeInfo > &target_exprs_type_infos, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:379
std::shared_ptr< const query_state::QueryState > query_state_
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:118
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
const size_t inputCount() const
Definition: RelAlgDag.h:875
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:865
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createCompoundWorkUnit ( const RelCompound compound,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4513 of file RelAlgExecutor.cpp.

References QueryPlanDagExtractor::applyLimitClauseToCacheKey(), CHECK_EQ, convert_bbox_intersect_join(), RegisteredQueryHint::defaults(), anonymous_namespace{RelAlgExecutor.cpp}::do_table_reordering(), executor_, ExecutionOptions::executor_type, QueryPlanDagExtractor::extractJoinInfo(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_left_deep_join_input_sizes(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), getLeftDeepJoinTreesInfo(), RelAlgNode::getQueryPlanDagHash(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), RelAlgNode::inputCount(), ExecutionOptions::just_explain, LEFT, anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), now_, shared::printContainer(), query_dag_, query_state_, anonymous_namespace{RelAlgExecutor.cpp}::rewrite_quals(), RelAlgNode::setOutputMetainfo(), RelAlgExecutionUnit::simple_quals, RelCompound::size(), ExecutionOptions::table_reordering, target_exprs_owned_, anonymous_namespace{RelAlgExecutor.cpp}::translate_groupby_exprs(), anonymous_namespace{RelAlgExecutor.cpp}::translate_quals(), anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), anonymous_namespace{RelAlgExecutor.cpp}::translate_targets(), translateLeftDeepJoinFilter(), and VLOG.

Referenced by createWorkUnit(), executeCompound(), executeDelete(), executeUpdate(), and getOuterFragmentCount().

4516  {
4517  std::vector<InputDescriptor> input_descs;
4518  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4519  auto input_to_nest_level = get_input_nest_levels(compound, {});
4520  std::tie(input_descs, input_col_descs, std::ignore) =
4521  get_input_desc(compound, input_to_nest_level, {});
4522  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4523  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4524  auto query_infos = get_table_infos(input_descs, executor_);
4525  CHECK_EQ(size_t(1), compound->inputCount());
4526  const auto left_deep_join =
4527  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
4528  JoinQualsPerNestingLevel left_deep_join_quals;
4529  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4530  : std::vector<JoinType>{get_join_type(compound)};
4531  std::vector<size_t> input_permutation;
4532  std::vector<size_t> left_deep_join_input_sizes;
4533  std::optional<unsigned> left_deep_tree_id;
4534  if (left_deep_join) {
4535  left_deep_tree_id = left_deep_join->getId();
4536  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4537  left_deep_join_quals = translateLeftDeepJoinFilter(
4538  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4539  if (eo.table_reordering &&
4540  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
4541  join_types.end()) {
4542  input_permutation = do_table_reordering(input_descs,
4543  input_col_descs,
4544  left_deep_join_quals,
4545  input_to_nest_level,
4546  compound,
4547  query_infos,
4548  executor_);
4549  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
4550  std::tie(input_descs, input_col_descs, std::ignore) =
4551  get_input_desc(compound, input_to_nest_level, input_permutation);
4552  left_deep_join_quals = translateLeftDeepJoinFilter(
4553  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4554  }
4555  }
4556  auto const bbox_intersect_qual_info = convert_bbox_intersect_join(left_deep_join_quals,
4557  input_descs,
4558  input_to_nest_level,
4559  input_permutation,
4560  input_col_descs,
4561  executor_);
4562  if (bbox_intersect_qual_info.is_reordered) {
4563  query_infos = get_table_infos(input_descs, executor_);
4564  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4565  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4566  }
4567  if (bbox_intersect_qual_info.has_bbox_intersect_join) {
4568  left_deep_join_quals = bbox_intersect_qual_info.join_quals;
4569  }
4570  RelAlgTranslator translator(
4571  query_state_, executor_, input_to_nest_level, join_types, now_, eo.just_explain);
4572  const auto quals_cf = translate_quals(compound, translator);
4573  const auto quals = rewrite_quals(quals_cf.quals);
4574  const auto scalar_sources =
4575  translate_scalar_sources(compound, translator, eo.executor_type);
4576  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
4577  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4578  const auto target_exprs = translate_targets(target_exprs_owned_,
4579  target_exprs_type_infos,
4580  scalar_sources,
4581  groupby_exprs,
4582  compound,
4583  translator,
4584  eo.executor_type);
4585 
4586  auto query_hint = RegisteredQueryHint::defaults();
4587  if (query_dag_) {
4588  auto candidate = query_dag_->getQueryHint(compound);
4589  if (candidate) {
4590  query_hint = *candidate;
4591  }
4592  }
4593  CHECK_EQ(compound->size(), target_exprs.size());
4594  const RelAlgExecutionUnit exe_unit = {input_descs,
4595  input_col_descs,
4596  quals_cf.simple_quals,
4597  quals,
4598  left_deep_join_quals,
4599  groupby_exprs,
4600  target_exprs,
4601  target_exprs_type_infos,
4602  nullptr,
4603  sort_info,
4604  0,
4605  query_hint,
4607  compound->getQueryPlanDagHash(), sort_info),
4608  {},
4609  {},
4610  false,
4611  std::nullopt,
4612  query_state_};
4613  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4614  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4615  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
4616  compound->setOutputMetainfo(targets_meta);
4617  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4618  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4619  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4620  rewritten_exe_unit.join_quals);
4621  }
4622  if (has_valid_query_plan_dag(compound)) {
4623  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4624  compound, left_deep_tree_id, left_deep_trees_info, executor_);
4625  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4626  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4627  }
4628  return {rewritten_exe_unit,
4629  compound,
4631  std::move(query_rewriter),
4632  input_permutation,
4633  left_deep_join_input_sizes};
4634 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:238
bool has_valid_query_plan_dag(const RelAlgNode *node)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
size_t size() const override
Definition: RelAlgDag.h:2094
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
std::vector< JoinCondition > JoinQualsPerNestingLevel
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:863
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
ExecutorType executor_type
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, std::unordered_map< size_t, SQLTypeInfo > &target_exprs_type_infos, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
JoinType get_join_type(const RelAlgNode *ra)
BoundingBoxIntersectJoinTranslationInfo convert_bbox_intersect_join(JoinQualsPerNestingLevel const &join_quals, std::vector< InputDescriptor > &input_descs, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, std::vector< size_t > &input_permutation, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_desc, Executor const *executor)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:379
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:118
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
QualsConjunctiveForm translate_quals(const RelCompound *compound, const RelAlgTranslator &translator)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:108
const size_t inputCount() const
Definition: RelAlgDag.h:875
Executor * executor_
#define VLOG(n)
Definition: Logger.h:388
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)
std::list< std::shared_ptr< Analyzer::Expr > > rewrite_quals(const std::list< std::shared_ptr< Analyzer::Expr >> &quals)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createFilterWorkUnit ( const RelFilter filter,
const SortInfo sort_info,
const bool  just_explain 
)
private

Definition at line 5400 of file RelAlgExecutor.cpp.

References QueryPlanDagExtractor::applyLimitClauseToCacheKey(), CHECK_EQ, RegisteredQueryHint::defaults(), executor_, QueryPlanDagExtractor::extractJoinInfo(), fold_expr(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_inputs_meta(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), RelFilter::getCondition(), getLeftDeepJoinTreesInfo(), RelAlgNode::getQueryPlanDagHash(), RelAlgNode::inputCount(), now_, query_dag_, query_state_, rewrite_expr(), RelAlgNode::setOutputMetainfo(), and target_exprs_owned_.

Referenced by createWorkUnit(), and executeFilter().

5402  {
5403  CHECK_EQ(size_t(1), filter->inputCount());
5404  std::vector<InputDescriptor> input_descs;
5405  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5406  std::vector<TargetMetaInfo> in_metainfo;
5407  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
5408  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
5409 
5410  const auto input_to_nest_level = get_input_nest_levels(filter, {});
5411  std::tie(input_descs, input_col_descs, used_inputs_owned) =
5412  get_input_desc(filter, input_to_nest_level, {});
5413  const auto join_type = get_join_type(filter);
5414  RelAlgTranslator translator(
5415  query_state_, executor_, input_to_nest_level, {join_type}, now_, just_explain);
5416  std::tie(in_metainfo, target_exprs_owned) =
5417  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
5418  const auto filter_expr = translator.translate(filter->getCondition());
5419  const auto query_infos = get_table_infos(input_descs, executor_);
5420 
5421  const auto qual = fold_expr(filter_expr.get());
5422  target_exprs_owned_.insert(
5423  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
5424 
5425  const auto target_exprs = get_raw_pointers(target_exprs_owned);
5426  filter->setOutputMetainfo(in_metainfo);
5427  const auto rewritten_qual = rewrite_expr(qual.get());
5428  auto query_hint = RegisteredQueryHint::defaults();
5429  if (query_dag_) {
5430  auto candidate = query_dag_->getQueryHint(filter);
5431  if (candidate) {
5432  query_hint = *candidate;
5433  }
5434  }
5435  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
5436  filter, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
5437  return {{input_descs,
5438  input_col_descs,
5439  {},
5440  {rewritten_qual ? rewritten_qual : qual},
5441  {},
5442  {nullptr},
5443  target_exprs,
5444  {},
5445  nullptr,
5446  sort_info,
5447  0,
5448  query_hint,
5450  filter->getQueryPlanDagHash(), sort_info),
5451  join_info.hash_table_plan_dag,
5452  join_info.table_id_to_node_map},
5453  filter,
5455  nullptr};
5456 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
const RexScalar * getCondition() const
Definition: RelAlgDag.h:1898
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:863
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
JoinType get_join_type(const RelAlgNode *ra)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:379
std::shared_ptr< const query_state::QueryState > query_state_
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:118
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
const size_t inputCount() const
Definition: RelAlgDag.h:875
Executor * executor_
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

WorkUnit RelAlgExecutor::createJoinWorkUnit ( const RelJoin ,
const SortInfo ,
const bool  just_explain 
)
private
RelAlgExecutor::WorkUnit RelAlgExecutor::createProjectWorkUnit ( const RelProject project,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4914 of file RelAlgExecutor.cpp.

References QueryPlanDagExtractor::applyLimitClauseToCacheKey(), convert_bbox_intersect_join(), RegisteredQueryHint::defaults(), anonymous_namespace{RelAlgExecutor.cpp}::do_table_reordering(), executor_, ExecutionOptions::executor_type, QueryPlanDagExtractor::extractJoinInfo(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_left_deep_join_input_sizes(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), getLeftDeepJoinTreesInfo(), RelAlgNode::getQueryPlanDagHash(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), ExecutionOptions::just_explain, anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), now_, shared::printContainer(), query_dag_, query_state_, RelAlgNode::setOutputMetainfo(), ExecutionOptions::table_reordering, target_exprs_owned_, anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), translateLeftDeepJoinFilter(), and VLOG.

Referenced by createWorkUnit(), executeDelete(), executeProject(), executeUpdate(), and getOuterFragmentCount().

4917  {
4918  std::vector<InputDescriptor> input_descs;
4919  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4920  auto input_to_nest_level = get_input_nest_levels(project, {});
4921  std::tie(input_descs, input_col_descs, std::ignore) =
4922  get_input_desc(project, input_to_nest_level, {});
4923  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4924  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4925  auto query_infos = get_table_infos(input_descs, executor_);
4926  const auto left_deep_join =
4927  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
4928  JoinQualsPerNestingLevel left_deep_join_quals;
4929  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4930  : std::vector<JoinType>{get_join_type(project)};
4931  std::vector<size_t> input_permutation;
4932  std::vector<size_t> left_deep_join_input_sizes;
4933  std::optional<unsigned> left_deep_tree_id;
4934  if (left_deep_join) {
4935  left_deep_tree_id = left_deep_join->getId();
4936  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4937  left_deep_join_quals = translateLeftDeepJoinFilter(
4938  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4939  if (eo.table_reordering) {
4940  input_permutation = do_table_reordering(input_descs,
4941  input_col_descs,
4942  left_deep_join_quals,
4943  input_to_nest_level,
4944  project,
4945  query_infos,
4946  executor_);
4947  input_to_nest_level = get_input_nest_levels(project, input_permutation);
4948  std::tie(input_descs, input_col_descs, std::ignore) =
4949  get_input_desc(project, input_to_nest_level, input_permutation);
4950  left_deep_join_quals = translateLeftDeepJoinFilter(
4951  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4952  }
4953  }
4954  auto const bbox_intersect_qual_info = convert_bbox_intersect_join(left_deep_join_quals,
4955  input_descs,
4956  input_to_nest_level,
4957  input_permutation,
4958  input_col_descs,
4959  executor_);
4960  if (bbox_intersect_qual_info.is_reordered) {
4961  query_infos = get_table_infos(input_descs, executor_);
4962  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4963  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4964  }
4965  if (bbox_intersect_qual_info.has_bbox_intersect_join) {
4966  left_deep_join_quals = bbox_intersect_qual_info.join_quals;
4967  }
4968  RelAlgTranslator translator(
4969  query_state_, executor_, input_to_nest_level, join_types, now_, eo.just_explain);
4970  const auto target_exprs_owned =
4971  translate_scalar_sources(project, translator, eo.executor_type);
4972 
4973  target_exprs_owned_.insert(
4974  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4975  const auto target_exprs = get_raw_pointers(target_exprs_owned);
4976  auto query_hint = RegisteredQueryHint::defaults();
4977  if (query_dag_) {
4978  auto candidate = query_dag_->getQueryHint(project);
4979  if (candidate) {
4980  query_hint = *candidate;
4981  }
4982  }
4983  const RelAlgExecutionUnit exe_unit = {input_descs,
4984  input_col_descs,
4985  {},
4986  {},
4987  left_deep_join_quals,
4988  {nullptr},
4989  target_exprs,
4990  {},
4991  nullptr,
4992  sort_info,
4993  0,
4994  query_hint,
4996  project->getQueryPlanDagHash(), sort_info),
4997  {},
4998  {},
4999  false,
5000  std::nullopt,
5001  query_state_};
5002  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
5003  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
5004  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
5005  project->setOutputMetainfo(targets_meta);
5006  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
5007  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
5008  left_deep_trees_info.emplace(left_deep_tree_id.value(),
5009  rewritten_exe_unit.join_quals);
5010  }
5011  if (has_valid_query_plan_dag(project)) {
5012  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
5013  project, left_deep_tree_id, left_deep_trees_info, executor_);
5014  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
5015  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
5016  }
5017  return {rewritten_exe_unit,
5018  project,
5020  std::move(query_rewriter),
5021  input_permutation,
5022  left_deep_join_input_sizes};
5023 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:238
bool has_valid_query_plan_dag(const RelAlgNode *node)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
std::vector< JoinCondition > JoinQualsPerNestingLevel
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:863
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
ExecutorType executor_type
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
JoinType get_join_type(const RelAlgNode *ra)
BoundingBoxIntersectJoinTranslationInfo convert_bbox_intersect_join(JoinQualsPerNestingLevel const &join_quals, std::vector< InputDescriptor > &input_descs, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, std::vector< size_t > &input_permutation, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_desc, Executor const *executor)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:379
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:118
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:108
Executor * executor_
#define VLOG(n)
Definition: Logger.h:388
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createSortInputWorkUnit ( const RelSort sort,
std::list< Analyzer::OrderEntry > &  order_entries,
const ExecutionOptions eo 
)
private

Definition at line 3394 of file RelAlgExecutor.cpp.

References CHECK_GT, RelSort::collationCount(), SpeculativeTopNBlacklist::contains(), createWorkUnit(), Default, anonymous_namespace{RelAlgExecutor.cpp}::first_oe_is_desc(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_scan_limit(), get_target_info(), RelAlgNode::getInput(), RelSort::getLimit(), RelSort::getOffset(), RelAlgExecutionUnit::input_descs, RelAlgNode::setOutputMetainfo(), speculative_topn_blacklist_, SpeculativeTopN, and StreamingTopN.

Referenced by executeRelAlgQuerySingleStep(), and executeSort().

3397  {
3398  const auto source = sort->getInput(0);
3399  auto limit = sort->getLimit();
3400  const size_t offset = sort->getOffset();
3401  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
3402  const size_t scan_total_limit =
3403  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
3404  size_t max_groups_buffer_entry_guess{
3405  scan_total_limit ? scan_total_limit : g_default_max_groups_buffer_entry_guess};
3407  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
3408  auto source_work_unit = createWorkUnit(source, sort_info, eo);
3409  const auto& source_exe_unit = source_work_unit.exe_unit;
3410 
3411  // we do not allow sorting geometry or array types
3412  for (auto order_entry : order_entries) {
3413  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
3414  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
3415  const auto& ti = get_target_info(te, false);
3416  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
3417  throw std::runtime_error(
3418  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
3419  }
3420  }
3421 
3422  if (source_exe_unit.groupby_exprs.size() == 1) {
3423  if (!source_exe_unit.groupby_exprs.front()) {
3424  sort_algorithm = SortAlgorithm::StreamingTopN;
3425  } else {
3426  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
3427  first_oe_is_desc(order_entries))) {
3428  sort_algorithm = SortAlgorithm::Default;
3429  }
3430  }
3431  }
3432 
3433  sort->setOutputMetainfo(source->getOutputMetainfo());
3434  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
3435  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
3436  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
3437  std::move(source_exe_unit.input_col_descs),
3438  source_exe_unit.simple_quals,
3439  source_exe_unit.quals,
3440  source_exe_unit.join_quals,
3441  source_exe_unit.groupby_exprs,
3442  source_exe_unit.target_exprs,
3443  source_exe_unit.target_exprs_original_type_infos,
3444  nullptr,
3445  {sort_info.order_entries, sort_algorithm, limit, offset},
3446  scan_total_limit,
3447  source_exe_unit.query_hint,
3448  source_exe_unit.query_plan_dag_hash,
3449  source_exe_unit.hash_table_build_plan_dag,
3450  source_exe_unit.table_id_to_node_map,
3451  source_exe_unit.use_bump_allocator,
3452  source_exe_unit.union_all,
3453  source_exe_unit.query_state},
3454  source,
3455  max_groups_buffer_entry_guess,
3456  std::move(source_work_unit.query_rewriter),
3457  source_work_unit.input_permutation,
3458  source_work_unit.left_deep_join_input_sizes};
3459 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
size_t getOffset() const
Definition: RelAlgDag.h:2228
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
static SpeculativeTopNBlacklist speculative_topn_blacklist_
std::vector< InputDescriptor > input_descs
#define CHECK_GT(x, y)
Definition: Logger.h:305
TargetInfo get_target_info(const Analyzer::Expr *target_expr, const bool bigint_count)
Definition: TargetInfo.h:92
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
size_t get_scan_limit(const RelAlgNode *ra, std::optional< size_t > limit)
size_t collationCount() const
Definition: RelAlgDag.h:2211
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:118
std::optional< size_t > getLimit() const
Definition: RelAlgDag.h:2226
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::TableFunctionWorkUnit RelAlgExecutor::createTableFunctionWorkUnit ( const RelTableFunction table_func,
const bool  just_explain,
const bool  is_gpu 
)
private

Definition at line 5156 of file RelAlgExecutor.cpp.

References bind_table_function(), CHECK, CHECK_EQ, CHECK_GT, CHECK_LT, RelTableFunction::countRexLiteralArgs(), DEFAULT_ROW_MULTIPLIER_VALUE, executor_, ext_arg_type_to_type_info(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelTableFunction::getColInputsSize(), RelTableFunction::getFunctionName(), RelTableFunction::getTableFuncInputAt(), IS_GEO, kENCODING_ARRAY, kENCODING_NONE, kINT, shared::StringDictKey::kTransientDictKey, LOG, now_, query_state_, RelAlgNode::setOutputMetainfo(), TableFunctions, TableFunctionExecutionUnit::target_exprs, target_exprs_owned_, to_string(), TRANSIENT_DICT_ID, anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), UNREACHABLE, and logger::WARNING.

Referenced by executeTableFunction().

5159  {
5160  std::vector<InputDescriptor> input_descs;
5161  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5162  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
5163  std::tie(input_descs, input_col_descs, std::ignore) =
5164  get_input_desc(rel_table_func, input_to_nest_level, {});
5165  const auto query_infos = get_table_infos(input_descs, executor_);
5166  RelAlgTranslator translator(
5167  query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
5168  auto input_exprs_owned = translate_scalar_sources(
5169  rel_table_func, translator, ::ExecutorType::TableFunctions);
5170  target_exprs_owned_.insert(
5171  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
5172 
5173  const auto table_function_impl_and_type_infos = [=]() {
5174  if (is_gpu) {
5175  try {
5176  return bind_table_function(
5177  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
5178  } catch (ExtensionFunctionBindingError& e) {
5179  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
5180  << " Redirecting " << rel_table_func->getFunctionName()
5181  << " step to run on CPU.";
5182  throw QueryMustRunOnCpu();
5183  }
5184  } else {
5185  try {
5186  return bind_table_function(
5187  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
5188  } catch (ExtensionFunctionBindingError& e) {
5189  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
5190  throw;
5191  }
5192  }
5193  }();
5194  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
5195  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
5196  size_t output_row_sizing_param = 0;
5197  if (table_function_impl
5198  .hasUserSpecifiedOutputSizeParameter()) { // constant and row multiplier
5199  const auto parameter_index =
5200  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
5201  CHECK_GT(parameter_index, size_t(0));
5202  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
5203  const auto parameter_expr =
5204  rel_table_func->getTableFuncInputAt(parameter_index - 1);
5205  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
5206  if (!parameter_expr_literal) {
5207  throw std::runtime_error(
5208  "Provided output buffer sizing parameter is not a literal. Only literal "
5209  "values are supported with output buffer sizing configured table "
5210  "functions.");
5211  }
5212  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
5213  if (literal_val < 0) {
5214  throw std::runtime_error("Provided output sizing parameter " +
5215  std::to_string(literal_val) +
5216  " must be positive integer.");
5217  }
5218  output_row_sizing_param = static_cast<size_t>(literal_val);
5219  } else {
5220  // RowMultiplier not specified in the SQL query. Set it to 1
5221  output_row_sizing_param = 1; // default value for RowMultiplier
5222  static Datum d = {DEFAULT_ROW_MULTIPLIER_VALUE};
5223  static Analyzer::ExpressionPtr DEFAULT_ROW_MULTIPLIER_EXPR =
5224  makeExpr<Analyzer::Constant>(kINT, false, d);
5225  // Push the constant 1 to input_exprs
5226  input_exprs_owned.insert(input_exprs_owned.begin() + parameter_index - 1,
5227  DEFAULT_ROW_MULTIPLIER_EXPR);
5228  }
5229  } else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
5230  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
5231  } else {
5232  UNREACHABLE();
5233  }
5234 
5235  std::vector<Analyzer::ColumnVar*> input_col_exprs;
5236  size_t input_index = 0;
5237  size_t arg_index = 0;
5238  const auto table_func_args = table_function_impl.getInputArgs();
5239  CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
5240  for (const auto& ti : table_function_type_infos) {
5241  if (ti.is_column_list()) {
5242  for (int i = 0; i < ti.get_dimension(); i++) {
5243  auto& input_expr = input_exprs_owned[input_index];
5244  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr.get());
5245  CHECK(col_var);
5246 
5247  // avoid setting type info to ti here since ti doesn't have all the
5248  // properties correctly set
5249  auto type_info = input_expr->get_type_info();
5250  if (ti.is_column_array()) {
5251  type_info.set_compression(kENCODING_ARRAY);
5252  type_info.set_subtype(type_info.get_subtype()); // set type to be subtype
5253  } else {
5254  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5255  }
5256  type_info.set_type(ti.get_type()); // set type to column list
5257  type_info.set_dimension(ti.get_dimension());
5258  type_info.setUsesFlatBuffer(type_info.get_elem_type().supportsFlatBuffer());
5259  input_expr->set_type_info(type_info);
5260 
5261  input_col_exprs.push_back(col_var);
5262  input_index++;
5263  }
5264  } else if (ti.is_column()) {
5265  auto& input_expr = input_exprs_owned[input_index];
5266  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr.get());
5267  CHECK(col_var);
5268  // same here! avoid setting type info to ti since it doesn't have all the
5269  // properties correctly set
5270  auto type_info = input_expr->get_type_info();
5271  if (ti.is_column_array()) {
5272  type_info.set_compression(kENCODING_ARRAY);
5273  type_info.set_subtype(type_info.get_subtype()); // set type to be subtype
5274  } else {
5275  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5276  }
5277  type_info.set_type(ti.get_type()); // set type to column
5278  type_info.setUsesFlatBuffer(type_info.get_elem_type().supportsFlatBuffer());
5279  input_expr->set_type_info(type_info);
5280  input_col_exprs.push_back(col_var);
5281  input_index++;
5282  } else {
5283  auto input_expr = input_exprs_owned[input_index];
5284  auto ext_func_arg_ti = ext_arg_type_to_type_info(table_func_args[arg_index]);
5285  if (ext_func_arg_ti != input_expr->get_type_info()) {
5286  input_exprs_owned[input_index] = input_expr->add_cast(ext_func_arg_ti);
5287  }
5288  input_index++;
5289  }
5290  arg_index++;
5291  }
5292  CHECK_EQ(input_col_exprs.size(), rel_table_func->getColInputsSize());
5293  std::vector<Analyzer::Expr*> table_func_outputs;
5294  constexpr int32_t transient_pos{-1};
5295  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
5296  auto ti = table_function_impl.getOutputSQLType(i);
5297  ti.setUsesFlatBuffer(ti.supportsFlatBuffer());
5298  if (ti.is_geometry()) {
5299  auto p = table_function_impl.getInputID(i);
5300  int32_t input_pos = p.first;
5301  if (input_pos != transient_pos) {
5302  CHECK(!ti.is_column_list()); // see QE-472
5303  CHECK_LT(input_pos, input_exprs_owned.size());
5304  const auto& reference_ti = input_exprs_owned[input_pos]->get_type_info();
5305  CHECK(IS_GEO(reference_ti.get_type()) || IS_GEO(reference_ti.get_subtype()));
5306  ti.set_input_srid(reference_ti.get_input_srid());
5307  ti.set_output_srid(reference_ti.get_output_srid());
5308  ti.set_compression(reference_ti.get_compression());
5309  ti.set_comp_param(reference_ti.get_comp_param());
5310  } else {
5311  ti.set_input_srid(0);
5312  ti.set_output_srid(0);
5313  ti.set_compression(kENCODING_NONE);
5314  ti.set_comp_param(0);
5315  }
5316  } else if (ti.is_dict_encoded_string() || ti.is_text_encoding_dict_array()) {
5317  auto p = table_function_impl.getInputID(i);
5318 
5319  int32_t input_pos = p.first;
5320  if (input_pos == transient_pos) {
5321  ti.set_comp_param(TRANSIENT_DICT_ID);
5322  ti.setStringDictKey(shared::StringDictKey::kTransientDictKey);
5323  } else {
5324  // Iterate over the list of arguments to compute the offset. Use this offset to
5325  // get the corresponding input
5326  int32_t offset = 0;
5327  for (int j = 0; j < input_pos; j++) {
5328  const auto ti = table_function_type_infos[j];
5329  offset += ti.is_column_list() ? ti.get_dimension() : 1;
5330  }
5331  input_pos = offset + p.second;
5332 
5333  CHECK_LT(input_pos, input_exprs_owned.size());
5334  const auto& dict_key =
5335  input_exprs_owned[input_pos]->get_type_info().getStringDictKey();
5336  ti.set_comp_param(dict_key.dict_id);
5337  ti.setStringDictKey(dict_key);
5338  }
5339  }
5340  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(
5341  ti, shared::ColumnKey{0, 0, int32_t(i)}, -1));
5342  table_func_outputs.push_back(target_exprs_owned_.back().get());
5343  }
5344  auto input_exprs = get_raw_pointers(input_exprs_owned);
5345  const TableFunctionExecutionUnit exe_unit = {
5346  input_descs,
5347  input_col_descs,
5348  input_exprs, // table function inputs
5349  input_col_exprs, // table function column inputs (duplicates w/ above)
5350  table_func_outputs, // table function projected exprs
5351  output_row_sizing_param, // output buffer sizing param
5352  table_function_impl};
5353  const auto targets_meta = get_targets_meta(rel_table_func, exe_unit.target_exprs);
5354  rel_table_func->setOutputMetainfo(targets_meta);
5355  return {exe_unit, rel_table_func};
5356 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
#define LOG(tag)
Definition: Logger.h:285
#define UNREACHABLE()
Definition: Logger.h:338
std::shared_ptr< Analyzer::Expr > ExpressionPtr
Definition: Analyzer.h:184
#define TRANSIENT_DICT_ID
Definition: DbObjectKeys.h:24
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string to_string(char const *&&v)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
#define CHECK_LT(x, y)
Definition: Logger.h:303
#define DEFAULT_ROW_MULTIPLIER_VALUE
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
const std::tuple< table_functions::TableFunction, std::vector< SQLTypeInfo > > bind_table_function(std::string name, Analyzer::ExpressionPtrVector input_args, const std::vector< table_functions::TableFunction > &table_funcs, const bool is_gpu)
static const StringDictKey kTransientDictKey
Definition: DbObjectKeys.h:45
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqltypes.h:72
std::vector< Analyzer::Expr * > target_exprs
Definition: Datum.h:71
Executor * executor_
#define IS_GEO(T)
Definition: sqltypes.h:310
SQLTypeInfo ext_arg_type_to_type_info(const ExtArgumentType ext_arg_type)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createUnionWorkUnit ( const RelLogicalUnion logical_union,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 5049 of file RelAlgExecutor.cpp.

References gpu_enabled::accumulate(), shared::append_move(), CHECK, RelRexToStringConfig::defaults(), RegisteredQueryHint::defaults(), EMPTY_HASHED_PLAN_DAG_KEY, executor_, g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelLogicalUnion::isAll(), shared::printContainer(), query_state_, RelAlgNode::setOutputMetainfo(), anonymous_namespace{RelAlgExecutor.cpp}::target_exprs_for_union(), target_exprs_owned_, RelAlgNode::toString(), and VLOG.

Referenced by executeUnion().

5052  {
5053  std::vector<InputDescriptor> input_descs;
5054  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5055  // Map ra input ptr to index (0, 1).
5056  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
5057  std::tie(input_descs, input_col_descs, std::ignore) =
5058  get_input_desc(logical_union, input_to_nest_level, {});
5059  const auto query_infos = get_table_infos(input_descs, executor_);
5060  auto const max_num_tuples =
5061  std::accumulate(query_infos.cbegin(),
5062  query_infos.cend(),
5063  size_t(0),
5064  [](auto max, auto const& query_info) {
5065  return std::max(max, query_info.info.getNumTuples());
5066  });
5067 
5068  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
5069  for (auto& pair : input_to_nest_level) {
5070  VLOG(3) << " (" << pair.first->toString(RelRexToStringConfig::defaults()) << ", "
5071  << pair.second << ')';
5072  }
5073 
5074  // For UNION queries, we need to keep the target_exprs from both subqueries since they
5075  // may differ on StringDictionaries.
5076  std::vector<Analyzer::Expr*> target_exprs_pair[2];
5077  for (unsigned i = 0; i < 2; ++i) {
5078  auto input_exprs_owned = target_exprs_for_union(logical_union->getInput(i));
5079  CHECK(!input_exprs_owned.empty())
5080  << "No metainfo found for input node(" << i << ") "
5081  << logical_union->getInput(i)->toString(RelRexToStringConfig::defaults());
5082  VLOG(3) << "i(" << i << ") input_exprs_owned.size()=" << input_exprs_owned.size();
5083  for (auto& input_expr : input_exprs_owned) {
5084  VLOG(3) << " " << input_expr->toString();
5085  }
5086  target_exprs_pair[i] = get_raw_pointers(input_exprs_owned);
5087  shared::append_move(target_exprs_owned_, std::move(input_exprs_owned));
5088  }
5089 
5090  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
5091  << " input_col_descs=" << shared::printContainer(input_col_descs)
5092  << " target_exprs.size()=" << target_exprs_pair[0].size()
5093  << " max_num_tuples=" << max_num_tuples;
5094 
5095  const RelAlgExecutionUnit exe_unit = {input_descs,
5096  input_col_descs,
5097  {}, // quals_cf.simple_quals,
5098  {}, // rewrite_quals(quals_cf.quals),
5099  {},
5100  {nullptr},
5101  target_exprs_pair[0],
5102  {},
5103  nullptr,
5104  sort_info,
5105  max_num_tuples,
5108  {},
5109  {},
5110  false,
5111  logical_union->isAll(),
5112  query_state_,
5113  target_exprs_pair[1]};
5114  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
5115  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
5116 
5117  RelAlgNode const* input0 = logical_union->getInput(0);
5118  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
5119  logical_union->setOutputMetainfo(
5120  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5121  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
5122  logical_union->setOutputMetainfo(
5123  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5124  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
5125  logical_union->setOutputMetainfo(
5126  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5127  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
5128  logical_union->setOutputMetainfo(
5129  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5130  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
5131  logical_union->setOutputMetainfo(
5132  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5133  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
5134  logical_union->setOutputMetainfo(
5135  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5136  } else if (auto const* node = dynamic_cast<const RelLogicalValues*>(input0)) {
5137  logical_union->setOutputMetainfo(
5138  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5139  } else if (dynamic_cast<const RelSort*>(input0)) {
5140  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
5141  } else {
5142  throw QueryNotSupported("Unsupported input type: " +
5144  }
5145  VLOG(3) << "logical_union->getOutputMetainfo()="
5146  << shared::printContainer(logical_union->getOutputMetainfo())
5147  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey()="
5148  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey();
5149 
5150  return {rewritten_exe_unit,
5151  logical_union,
5153  std::move(query_rewriter)};
5154 }
bool isAll() const
Definition: RelAlgDag.h:2770
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
size_t append_move(std::vector< T > &destination, std::vector< T > &&source)
Definition: misc.h:78
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_for_union(RelAlgNode const *input_node)
virtual std::string toString(RelRexToStringConfig config=RelRexToStringConfig::defaults()) const =0
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
static RegisteredQueryHint defaults()
Definition: QueryHint.h:379
std::shared_ptr< const query_state::QueryState > query_state_
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:78
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:118
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:108
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:865
Executor * executor_
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::unique_ptr< WindowFunctionContext > RelAlgExecutor::createWindowFunctionContext ( const Analyzer::WindowFunction window_func,
const std::shared_ptr< Analyzer::BinOper > &  partition_key_cond,
std::unordered_map< QueryPlanHash, std::shared_ptr< HashJoin >> &  partition_cache,
std::unordered_map< QueryPlanHash, size_t > &  sorted_partition_key_ref_count_map,
const WorkUnit work_unit,
const std::vector< InputTableInfo > &  query_infos,
const CompilationOptions co,
ColumnCacheMap column_cache_map,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner 
)
private

Definition at line 2568 of file RelAlgExecutor.cpp.

References RegisteredQueryHint::aggregate_tree_fanout, QueryPlanDagExtractor::applyLimitClauseToCacheKey(), RelAlgExecutor::WorkUnit::body, CHECK, CHECK_EQ, Data_Namespace::CPU_LEVEL, CompilationOptions::device_type, RelAlgExecutor::WorkUnit::exe_unit, executor_, g_window_function_aggregation_tree_fanout, Analyzer::WindowFunction::getArgs(), Analyzer::WindowFunction::getCollation(), ColumnFetcher::getOneColumnFragment(), Analyzer::WindowFunction::getOrderKeys(), RelAlgNode::getQueryPlanDagHash(), GPU, Data_Namespace::GPU_LEVEL, RelAlgExecutionUnit::hash_table_build_plan_dag, hash_value(), Analyzer::WindowFunction::isFrameNavigateWindowFunction(), OneToMany, RelAlgExecutionUnit::query_hint, RelAlgExecutionUnit::sort_info, RelAlgExecutionUnit::table_id_to_node_map, VLOG, WINDOW_FUNCTION, and WINDOW_FUNCTION_FRAMING.

Referenced by computeWindow().

2577  {
2578  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2579  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
2582  std::unique_ptr<WindowFunctionContext> context;
2583  auto partition_cache_key = QueryPlanDagExtractor::applyLimitClauseToCacheKey(
2584  work_unit.body->getQueryPlanDagHash(), work_unit.exe_unit.sort_info);
2585  JoinType window_partition_type = window_func->isFrameNavigateWindowFunction()
2588  if (partition_key_cond) {
2589  auto partition_cond_str = partition_key_cond->toString();
2590  auto partition_key_hash = boost::hash_value(partition_cond_str);
2591  boost::hash_combine(partition_cache_key, partition_key_hash);
2592  boost::hash_combine(partition_cache_key, static_cast<int>(window_partition_type));
2593  std::shared_ptr<HashJoin> partition_ptr;
2594  auto cached_hash_table_it = partition_cache.find(partition_cache_key);
2595  if (cached_hash_table_it != partition_cache.end()) {
2596  partition_ptr = cached_hash_table_it->second;
2597  VLOG(1) << "Reuse a hash table to compute window function context (key: "
2598  << partition_cache_key << ", partition condition: " << partition_cond_str
2599  << ")";
2600  } else {
2601  const auto hash_table_or_err = executor_->buildHashTableForQualifier(
2602  partition_key_cond,
2603  query_infos,
2604  memory_level,
2605  window_partition_type,
2607  column_cache_map,
2608  work_unit.exe_unit.hash_table_build_plan_dag,
2609  work_unit.exe_unit.query_hint,
2610  work_unit.exe_unit.table_id_to_node_map);
2611  if (!hash_table_or_err.fail_reason.empty()) {
2612  throw std::runtime_error(hash_table_or_err.fail_reason);
2613  }
2614  CHECK(hash_table_or_err.hash_table->getHashType() == HashType::OneToMany);
2615  partition_ptr = hash_table_or_err.hash_table;
2616  CHECK(partition_cache.insert(std::make_pair(partition_cache_key, partition_ptr))
2617  .second);
2618  VLOG(1) << "Put a generated hash table for computing window function context to "
2619  "cache (key: "
2620  << partition_cache_key << ", partition condition: " << partition_cond_str
2621  << ")";
2622  }
2623  CHECK(partition_ptr);
2624  auto aggregate_tree_fanout = g_window_function_aggregation_tree_fanout;
2625  if (work_unit.exe_unit.query_hint.aggregate_tree_fanout != aggregate_tree_fanout) {
2626  aggregate_tree_fanout = work_unit.exe_unit.query_hint.aggregate_tree_fanout;
2627  VLOG(1) << "Aggregate tree's fanout is set to " << aggregate_tree_fanout;
2628  }
2629  context = std::make_unique<WindowFunctionContext>(window_func,
2630  partition_cache_key,
2631  partition_ptr,
2632  elem_count,
2633  co.device_type,
2634  row_set_mem_owner,
2635  aggregate_tree_fanout);
2636  } else {
2637  context = std::make_unique<WindowFunctionContext>(
2638  window_func, elem_count, co.device_type, row_set_mem_owner);
2639  }
2640  const auto& order_keys = window_func->getOrderKeys();
2641  if (!order_keys.empty()) {
2642  auto sorted_partition_cache_key = partition_cache_key;
2643  for (auto& order_key : order_keys) {
2644  boost::hash_combine(sorted_partition_cache_key, order_key->toString());
2645  }
2646  for (auto& collation : window_func->getCollation()) {
2647  boost::hash_combine(sorted_partition_cache_key, collation.toString());
2648  }
2649  context->setSortedPartitionCacheKey(sorted_partition_cache_key);
2650  auto cache_key_cnt_it =
2651  sorted_partition_key_ref_count_map.try_emplace(sorted_partition_cache_key, 1);
2652  if (!cache_key_cnt_it.second) {
2653  sorted_partition_key_ref_count_map[sorted_partition_cache_key] =
2654  cache_key_cnt_it.first->second + 1;
2655  }
2656 
2657  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2658  for (const auto& order_key : order_keys) {
2659  const auto order_col =
2660  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
2661  if (!order_col) {
2662  throw std::runtime_error("Only order by columns supported for now");
2663  }
2664  auto const [column, col_elem_count] =
2666  *order_col,
2667  query_infos.front().info.fragments.front(),
2668  memory_level,
2669  0,
2670  nullptr,
2671  /*thread_idx=*/0,
2672  chunks_owner,
2673  column_cache_map);
2674 
2675  CHECK_EQ(col_elem_count, elem_count);
2676  context->addOrderColumn(column, order_col->get_type_info(), chunks_owner);
2677  }
2678  }
2679  if (context->getWindowFunction()->hasFraming() ||
2680  context->getWindowFunction()->isMissingValueFillingFunction()) {
2681  // todo (yoonmin) : if we try to support generic window function expression without
2682  // extra project node, we need to revisit here b/c the current logic assumes that
2683  // window function expression has a single input source
2684  auto& window_function_expression_args = window_func->getArgs();
2685  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2686  for (auto& expr : window_function_expression_args) {
2687  if (const auto arg_col_var =
2688  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(expr)) {
2689  auto const [column, col_elem_count] = ColumnFetcher::getOneColumnFragment(
2690  executor_,
2691  *arg_col_var,
2692  query_infos.front().info.fragments.front(),
2693  memory_level,
2694  0,
2695  nullptr,
2696  /*thread_idx=*/0,
2697  chunks_owner,
2698  column_cache_map);
2699  CHECK_EQ(col_elem_count, elem_count);
2700  context->addColumnBufferForWindowFunctionExpression(column, chunks_owner);
2701  }
2702  }
2703  }
2704  return context;
2705 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
JoinType
Definition: sqldefs.h:238
bool isFrameNavigateWindowFunction() const
Definition: Analyzer.h:2979
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:2933
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:2951
size_t g_window_function_aggregation_tree_fanout
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:2927
ExecutorDeviceType device_type
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
Definition: RelAlgDag.cpp:3548
#define CHECK(condition)
Definition: Logger.h:291
Executor * executor_
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createWorkUnit ( const RelAlgNode node,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4332 of file RelAlgExecutor.cpp.

References createAggregateWorkUnit(), createCompoundWorkUnit(), createFilterWorkUnit(), createProjectWorkUnit(), RelRexToStringConfig::defaults(), logger::FATAL, ExecutionOptions::just_explain, LOG, and RelAlgNode::toString().

Referenced by createSortInputWorkUnit(), and getJoinInfo().

4334  {
4335  const auto compound = dynamic_cast<const RelCompound*>(node);
4336  if (compound) {
4337  return createCompoundWorkUnit(compound, sort_info, eo);
4338  }
4339  const auto project = dynamic_cast<const RelProject*>(node);
4340  if (project) {
4341  return createProjectWorkUnit(project, sort_info, eo);
4342  }
4343  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
4344  if (aggregate) {
4345  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
4346  }
4347  const auto filter = dynamic_cast<const RelFilter*>(node);
4348  if (filter) {
4349  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
4350  }
4351  LOG(FATAL) << "Unhandled node type: "
4353  return {};
4354 }
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
#define LOG(tag)
Definition: Logger.h:285
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
virtual std::string toString(RelRexToStringConfig config=RelRexToStringConfig::defaults()) const =0
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:78
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::eraseFromTemporaryTables ( const int  table_id)
inlineprivate

Definition at line 409 of file RelAlgExecutor.h.

References temporary_tables_.

409 { temporary_tables_.erase(table_id); }
TemporaryTables temporary_tables_
ExecutionResult RelAlgExecutor::executeAggregate ( const RelAggregate aggregate,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2302 of file RelAlgExecutor.cpp.

References createAggregateWorkUnit(), DEBUG_TIMER, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

2306  {
2307  auto timer = DEBUG_TIMER(__func__);
2308  const auto work_unit = createAggregateWorkUnit(aggregate, SortInfo(), eo.just_explain);
2309  return executeWorkUnit(work_unit,
2310  aggregate->getOutputMetainfo(),
2311  true,
2312  co,
2313  eo,
2314  render_info,
2315  queue_time_ms);
2316 }
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define DEBUG_TIMER(name)
Definition: Logger.h:412
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:865

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeCompound ( const RelCompound compound,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2285 of file RelAlgExecutor.cpp.

References createCompoundWorkUnit(), DEBUG_TIMER, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and RelCompound::isAggregate().

Referenced by executeRelAlgStep().

2289  {
2290  auto timer = DEBUG_TIMER(__func__);
2291  const auto work_unit = createCompoundWorkUnit(compound, SortInfo(), eo);
2292  CompilationOptions co_compound = co;
2293  return executeWorkUnit(work_unit,
2294  compound->getOutputMetainfo(),
2295  compound->isAggregate(),
2296  co_compound,
2297  eo,
2298  render_info,
2299  queue_time_ms);
2300 }
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
bool isAggregate() const
Definition: RelAlgDag.h:2123
#define DEBUG_TIMER(name)
Definition: Logger.h:412
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:865

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeDelete ( const RelAlgNode node,
const CompilationOptions co,
const ExecutionOptions eo_in,
const int64_t  queue_time_ms 
)
private

Definition at line 2180 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, Executor::clearExternalCaches(), createCompoundWorkUnit(), createProjectWorkUnit(), DEBUG_TIMER, RelRexToStringConfig::defaults(), dml_transaction_parameters_, executor_, CompilationOptions::filter_on_deleted_column, get_table_infos(), get_temporary_table(), QueryExecutionError::getErrorCode(), getErrorMessageFromCode(), CompilationOptions::makeCpuOnly(), post_execution_callback_, table_is_temporary(), temporary_tables_, and StorageIOFacility::yieldDeleteCallback().

Referenced by executeRelAlgStep().

2183  {
2184  CHECK(node);
2185  auto timer = DEBUG_TIMER(__func__);
2186 
2187  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
2188  auto& work_unit,
2189  const bool is_aggregate) {
2190  auto* table_descriptor = node->getModifiedTableDescriptor();
2191  CHECK(table_descriptor);
2192  if (!table_descriptor->hasDeletedCol) {
2193  throw std::runtime_error(
2194  "DELETE queries are only supported on tables with the vacuum attribute set to "
2195  "'delayed'");
2196  }
2197 
2198  const auto catalog = node->getModifiedTableCatalog();
2199  CHECK(catalog);
2200  Executor::clearExternalCaches(false, table_descriptor, catalog->getDatabaseId());
2201 
2202  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2203 
2204  auto execute_delete_ra_exe_unit =
2205  [this, &table_infos, &table_descriptor, &eo_in, &co, catalog](
2206  const auto& exe_unit, const bool is_aggregate) {
2208  std::make_unique<DeleteTransactionParameters>(table_descriptor, *catalog);
2209  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
2211  CHECK(delete_params);
2212  auto delete_callback = yieldDeleteCallback(*delete_params);
2214 
2215  auto eo = eo_in;
2216  if (dml_transaction_parameters_->tableIsTemporary()) {
2217  eo.output_columnar_hint = true;
2218  co_delete.filter_on_deleted_column =
2219  false; // project the entire delete column for columnar update
2220  } else {
2221  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
2222  }
2223 
2224  try {
2225  auto table_update_metadata =
2226  executor_->executeUpdate(exe_unit,
2227  table_infos,
2228  table_descriptor,
2229  co_delete,
2230  eo,
2231  *catalog,
2232  executor_->row_set_mem_owner_,
2233  delete_callback,
2234  is_aggregate);
2235  post_execution_callback_ = [table_update_metadata, this, catalog]() {
2236  dml_transaction_parameters_->finalizeTransaction(*catalog);
2237  TableOptimizer table_optimizer{
2238  dml_transaction_parameters_->getTableDescriptor(), executor_, *catalog};
2239  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2240  };
2241  } catch (const QueryExecutionError& e) {
2242  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2243  }
2244  };
2245 
2246  if (table_is_temporary(table_descriptor)) {
2247  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2248  const auto cd = catalog->getDeletedColumn(table_descriptor);
2249  CHECK(cd);
2250  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
2251  cd->columnType,
2253  catalog->getDatabaseId(), table_descriptor->tableId, cd->columnId},
2254  0);
2255  const auto rewritten_exe_unit =
2256  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
2257  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2258  } else {
2259  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2260  }
2261  };
2262 
2263  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2264  const auto work_unit = createCompoundWorkUnit(compound, SortInfo(), eo_in);
2265  execute_delete_for_node(compound, work_unit, compound->isAggregate());
2266  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2267  auto work_unit = createProjectWorkUnit(project, SortInfo(), eo_in);
2268  if (project->isSimple()) {
2269  CHECK_EQ(size_t(1), project->inputCount());
2270  const auto input_ra = project->getInput(0);
2271  if (dynamic_cast<const RelSort*>(input_ra)) {
2272  const auto& input_table =
2273  get_temporary_table(&temporary_tables_, -input_ra->getId());
2274  CHECK(input_table);
2275  work_unit.exe_unit.scan_limit = input_table->rowCount();
2276  }
2277  }
2278  execute_delete_for_node(project, work_unit, false);
2279  } else {
2280  throw std::runtime_error("Unsupported parent node for delete: " +
2281  node->toString(RelRexToStringConfig::defaults()));
2282  }
2283 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::optional< std::function< void()> > post_execution_callback_
int32_t getErrorCode() const
Definition: ErrorHandling.h:63
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
TemporaryTables temporary_tables_
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:246
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool table_is_temporary(const TableDescriptor *const td)
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:78
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:412
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:438
static std::string getErrorMessageFromCode(const int32_t error_code)
std::unique_ptr< TransactionParameters > dml_transaction_parameters_
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeFilter ( const RelFilter filter,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2707 of file RelAlgExecutor.cpp.

References createFilterWorkUnit(), DEBUG_TIMER, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

2711  {
2712  auto timer = DEBUG_TIMER(__func__);
2713  const auto work_unit = createFilterWorkUnit(filter, SortInfo(), eo.just_explain);
2714  return executeWorkUnit(
2715  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
2716 }
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define DEBUG_TIMER(name)
Definition: Logger.h:412
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:865
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeLogicalValues ( const RelLogicalValues logical_values,
const ExecutionOptions eo 
)
private

Definition at line 2760 of file RelAlgExecutor.cpp.

References CPU, DEBUG_TIMER, executor_, RelLogicalValues::getNumRows(), RelLogicalValues::getTupleType(), kBIGINT, kCOUNT, kNULLT, heavyai::Projection, query_mem_desc, and RelAlgNode::setOutputMetainfo().

Referenced by executeRelAlgStep().

2762  {
2763  auto timer = DEBUG_TIMER(__func__);
2766 
2767  auto tuple_type = logical_values->getTupleType();
2768  for (size_t i = 0; i < tuple_type.size(); ++i) {
2769  auto& target_meta_info = tuple_type[i];
2770  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2771  // replace w/ bigint
2772  tuple_type[i] =
2773  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2774  }
2775  query_mem_desc.addColSlotInfo(
2776  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2777  }
2778  logical_values->setOutputMetainfo(tuple_type);
2779 
2780  std::vector<TargetInfo> target_infos;
2781  for (const auto& tuple_type_component : tuple_type) {
2782  target_infos.emplace_back(TargetInfo{false,
2783  kCOUNT,
2784  tuple_type_component.get_type_info(),
2785  SQLTypeInfo(kNULLT, false),
2786  false,
2787  false,
2788  /*is_varlen_projection=*/false});
2789  }
2790  std::shared_ptr<ResultSet> rs{
2791  ResultSetLogicalValuesBuilder{logical_values,
2792  target_infos,
2795  executor_->getRowSetMemoryOwner(),
2796  executor_}
2797  .build()};
2798 
2799  return {rs, tuple_type};
2800 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
size_t getNumRows() const
Definition: RelAlgDag.h:2720
const std::vector< TargetMetaInfo > getTupleType() const
Definition: RelAlgDag.h:2687
Projection
Definition: enums.h:58
Definition: sqldefs.h:81
#define DEBUG_TIMER(name)
Definition: Logger.h:412
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeModify ( const RelModify modify,
const ExecutionOptions eo 
)
private

Definition at line 2850 of file RelAlgExecutor.cpp.

References CPU, DEBUG_TIMER, executor_, and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

2851  {
2852  auto timer = DEBUG_TIMER(__func__);
2853  if (eo.just_explain) {
2854  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2855  }
2856 
2857  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2860  executor_->getRowSetMemoryOwner(),
2861  executor_->blockSize(),
2862  executor_->gridSize());
2863 
2864  std::vector<TargetMetaInfo> empty_targets;
2865  return {rs, empty_targets};
2866 }
std::vector< TargetInfo > TargetInfoList
#define DEBUG_TIMER(name)
Definition: Logger.h:412
Executor * executor_

+ Here is the caller graph for this function:

void RelAlgExecutor::executePostExecutionCallback ( )

Definition at line 4325 of file RelAlgExecutor.cpp.

References post_execution_callback_, and VLOG.

4325  {
4327  VLOG(1) << "Running post execution callback.";
4328  (*post_execution_callback_)();
4329  }
4330 }
std::optional< std::function< void()> > post_execution_callback_
#define VLOG(n)
Definition: Logger.h:388
ExecutionResult RelAlgExecutor::executeProject ( const RelProject project,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const std::optional< size_t >  previous_count 
)
private

Definition at line 2331 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, CPU, createProjectWorkUnit(), DEBUG_TIMER, CompilationOptions::device_type, executeWorkUnit(), get_temporary_table(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelAlgNode::inputCount(), RelProject::isSimple(), and temporary_tables_.

Referenced by executeRelAlgStep().

2337  {
2338  auto timer = DEBUG_TIMER(__func__);
2339  auto work_unit = createProjectWorkUnit(project, SortInfo(), eo);
2340  CompilationOptions co_project = co;
2341  if (project->isSimple()) {
2342  CHECK_EQ(size_t(1), project->inputCount());
2343  const auto input_ra = project->getInput(0);
2344  if (dynamic_cast<const RelSort*>(input_ra)) {
2345  co_project.device_type = ExecutorDeviceType::CPU;
2346  const auto& input_table =
2347  get_temporary_table(&temporary_tables_, -input_ra->getId());
2348  CHECK(input_table);
2349  work_unit.exe_unit.scan_limit =
2350  std::min(input_table->getLimit(), input_table->rowCount());
2351  }
2352  }
2353  return executeWorkUnit(work_unit,
2354  project->getOutputMetainfo(),
2355  false,
2356  co_project,
2357  eo,
2358  render_info,
2359  queue_time_ms,
2360  previous_count);
2361 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
TemporaryTables temporary_tables_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:246
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
bool isSimple() const
Definition: RelAlgDag.h:1307
ExecutorDeviceType device_type
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
const size_t inputCount() const
Definition: RelAlgDag.h:875
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:865

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQuery ( const CompilationOptions co,
const ExecutionOptions eo,
const bool  just_explain_plan,
const bool  explain_verbose,
RenderInfo render_info 
)

Definition at line 564 of file RelAlgExecutor.cpp.

References CHECK, DEBUG_TIMER, executeRelAlgQueryNoRetry(), RenderInfo::forceNonInSitu(), g_allow_cpu_retry, logger::INFO, INJECT_TIMER, RelAlgDag::kBuiltOptimized, LOG, CompilationOptions::makeCpuOnly(), post_execution_callback_, query_dag_, run_benchmark::run_query(), and VLOG.

568  {
569  CHECK(query_dag_);
571  << static_cast<int>(query_dag_->getBuildState());
572 
573  auto timer = DEBUG_TIMER(__func__);
575 
576  auto run_query = [&](const CompilationOptions& co_in) {
577  auto execution_result = executeRelAlgQueryNoRetry(
578  co_in, eo, just_explain_plan, explain_verbose, render_info);
579 
580  constexpr bool vlog_result_set_summary{false};
581  if constexpr (vlog_result_set_summary) {
582  VLOG(1) << execution_result.getRows()->summaryToString();
583  }
584 
586  VLOG(1) << "Running post execution callback.";
587  (*post_execution_callback_)();
588  }
589  return execution_result;
590  };
591 
592  try {
593  return run_query(co);
594  } catch (const QueryMustRunOnCpu&) {
595  if (!g_allow_cpu_retry) {
596  throw;
597  }
598  }
599  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
600  auto co_cpu = CompilationOptions::makeCpuOnly(co);
601 
602  if (render_info) {
603  render_info->forceNonInSitu();
604  }
605  return run_query(co_cpu);
606 }
std::optional< std::function< void()> > post_execution_callback_
void forceNonInSitu()
Definition: RenderInfo.cpp:46
#define LOG(tag)
Definition: Logger.h:285
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:122
std::unique_ptr< RelAlgDag > query_dag_
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, const bool explain_verbose, RenderInfo *render_info)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
bool g_allow_cpu_retry
Definition: Execute.cpp:93
ExecutionResult executeRelAlgQuery(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, const bool explain_verbose, RenderInfo *render_info)
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQueryNoRetry ( const CompilationOptions co,
const ExecutionOptions eo,
const bool  just_explain_plan,
const bool  explain_verbose,
RenderInfo render_info 
)
private

Definition at line 608 of file RelAlgExecutor.cpp.

References ExecutionOptions::allow_runtime_query_interrupt, CHECK, cleanupPostExecution(), DEBUG_TIMER, executeRelAlgQueryWithFilterPushDown(), executeRelAlgSeq(), executor_, ExecutionOptions::find_push_down_candidates, g_enable_dynamic_watchdog, getGlobalQueryHint(), RelAlgDag::getNodes(), getParsedQueryHint(), getRelAlgDag(), getRootRelAlgNode(), getSubqueries(), RelAlgDagViewer::handleQueryEngineVector(), QueryExecutionError::hasErrorCode(), INJECT_TIMER, ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, anonymous_namespace{RelAlgExecutor.cpp}::prepare_for_system_table_execution(), anonymous_namespace{RelAlgExecutor.cpp}::prepare_foreign_table_for_execution(), query_dag_, query_state_, run_benchmark_import::result, RelAlgDag::setGlobalQueryHints(), setupCaching(), timer_start(), and timer_stop().

Referenced by executeRelAlgQuery().

612  {
614  auto timer = DEBUG_TIMER(__func__);
615  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
616 
617  query_dag_->resetQueryExecutionState();
618  const auto& ra = query_dag_->getRootNode();
619 
620  // capture the lock acquistion time
621  auto clock_begin = timer_start();
623  executor_->resetInterrupt();
624  }
625  std::string query_session{""};
626  std::string query_str{"N/A"};
627  std::string query_submitted_time{""};
628  // gather necessary query's info
629  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
630  query_session = query_state_->getConstSessionInfo()->get_session_id();
631  query_str = query_state_->getQueryStr();
632  query_submitted_time = query_state_->getQuerySubmittedTime();
633  }
634 
635  auto validate_or_explain_query =
636  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
637  auto interruptable = !render_info && !query_session.empty() &&
638  eo.allow_runtime_query_interrupt && !validate_or_explain_query;
639  if (interruptable) {
640  // if we reach here, the current query which was waiting an idle executor
641  // within the dispatch queue is now scheduled to the specific executor
642  // (not UNITARY_EXECUTOR)
643  // so we update the query session's status with the executor that takes this query
644  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
645  query_session, query_str, query_submitted_time);
646 
647  // now the query is going to be executed, so update the status as
648  // "RUNNING_QUERY_KERNEL"
649  executor_->updateQuerySessionStatus(
650  query_session,
651  query_submitted_time,
652  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
653  }
654 
655  // so it should do cleanup session info after finishing its execution
656  ScopeGuard clearQuerySessionInfo =
657  [this, &query_session, &interruptable, &query_submitted_time] {
658  // reset the runtime query interrupt status after the end of query execution
659  if (interruptable) {
660  // cleanup running session's info
661  executor_->clearQuerySessionStatus(query_session, query_submitted_time);
662  }
663  };
664 
665  auto acquire_execute_mutex = [](Executor * executor) -> auto{
666  auto ret = executor->acquireExecuteMutex();
667  return ret;
668  };
669  // now we acquire executor lock in here to make sure that this executor holds
670  // all necessary resources and at the same time protect them against other executor
671  auto lock = acquire_execute_mutex(executor_);
672 
673  if (interruptable) {
674  // check whether this query session is "already" interrupted
675  // this case occurs when there is very short gap between being interrupted and
676  // taking the execute lock
677  // if so we have to remove "all" queries initiated by this session and we do in here
678  // without running the query
679  try {
680  executor_->checkPendingQueryStatus(query_session);
681  } catch (QueryExecutionError& e) {
682  if (e.hasErrorCode(ErrorCode::INTERRUPTED)) {
683  throw std::runtime_error("Query execution has been interrupted (pending query)");
684  }
685  throw e;
686  } catch (...) {
687  throw std::runtime_error("Checking pending query status failed: unknown error");
688  }
689  }
690  int64_t queue_time_ms = timer_stop(clock_begin);
691 
693 
694  // Notify foreign tables to load prior to caching
696 
697  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
698  setupCaching(&ra);
699 
700  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
701  auto ed_seq = RaExecutionSequence(&ra, executor_);
702 
703  if (just_explain_plan) {
704  const auto& ra = getRootRelAlgNode();
705  auto ed_seq = RaExecutionSequence(&ra, executor_);
706  std::vector<const RelAlgNode*> steps;
707  for (size_t i = 0; i < ed_seq.size(); i++) {
708  steps.emplace_back(ed_seq.getDescriptor(i)->getBody());
709  steps.back()->setStepNumber(i + 1);
710  }
711  std::stringstream ss;
712  auto& nodes = getRelAlgDag()->getNodes();
713  RelAlgDagViewer dagv(false, explain_verbose);
714  dagv.handleQueryEngineVector(nodes);
715  ss << dagv;
716  auto rs = std::make_shared<ResultSet>(ss.str());
717  return {rs, {}};
718  }
719 
720  if (eo.find_push_down_candidates) {
721  // this extra logic is mainly due to current limitations on multi-step queries
722  // and/or subqueries.
724  ed_seq, co, eo, render_info, queue_time_ms);
725  }
726  timer_setup.stop();
727 
728  // Dispatch the subqueries first
729  const auto global_hints = getGlobalQueryHint();
730  for (auto subquery : getSubqueries()) {
731  const auto subquery_ra = subquery->getRelAlg();
732  CHECK(subquery_ra);
733  if (subquery_ra->hasContextData()) {
734  continue;
735  }
736  // Execute the subquery and cache the result.
737  RelAlgExecutor subquery_executor(executor_, query_state_);
738  // Propagate global and local query hint if necessary
739  const auto local_hints = getParsedQueryHint(subquery_ra);
740  if (global_hints || local_hints) {
741  const auto subquery_rel_alg_dag = subquery_executor.getRelAlgDag();
742  if (global_hints) {
743  subquery_rel_alg_dag->setGlobalQueryHints(*global_hints);
744  }
745  if (local_hints) {
746  subquery_rel_alg_dag->registerQueryHint(subquery_ra, *local_hints);
747  }
748  }
749  RaExecutionSequence subquery_seq(subquery_ra, executor_);
750  auto result = subquery_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
751  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
752  }
753  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
754 }
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
void setupCaching(const RelAlgNode *ra)
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:81
std::optional< RegisteredQueryHint > getParsedQueryHint(const RelAlgNode *node)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
std::vector< std::shared_ptr< RelAlgNode > > & getNodes()
Definition: RelAlgDag.h:2824
#define INJECT_TIMER(DESC)
Definition: measure.h:122
A container for relational algebra descriptors defining the execution order for a relational algebra ...
bool hasErrorCode(ErrorCode const ec) const
Definition: ErrorHandling.h:65
std::optional< RegisteredQueryHint > getGlobalQueryHint()
std::unique_ptr< RelAlgDag > query_dag_
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node)
std::shared_ptr< const query_state::QueryState > query_state_
const RelAlgNode & getRootRelAlgNode() const
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, const bool explain_verbose, RenderInfo *render_info)
void prepare_for_system_table_execution(const RelAlgNode &ra_node, const CompilationOptions &co)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
void cleanupPostExecution()
RelAlgDag * getRelAlgDag()
Executor * executor_
Type timer_start()
Definition: measure.h:42

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

QueryStepExecutionResult RelAlgExecutor::executeRelAlgQuerySingleStep ( const RaExecutionSequence seq,
const size_t  step_idx,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info 
)

Definition at line 806 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, anonymous_namespace{RelAlgExecutor.cpp}::check_sort_node_source_constraint(), CPU, createSortInputWorkUnit(), CompilationOptions::device_type, executeRelAlgSubSeq(), RaExecutionSequence::getDescriptor(), GPU, logger::INFO, INJECT_TIMER, ExecutionOptions::just_validate, LOG, anonymous_namespace{RelAlgExecutor.cpp}::node_is_aggregate(), post_execution_callback_, queue_time_ms_, Reduce, GroupByAndAggregate::shard_count_for_top_groups(), gpu_enabled::sort(), Union, and VLOG.

811  {
812  INJECT_TIMER(executeRelAlgQueryStep);
813 
814  auto exe_desc_ptr = seq.getDescriptor(step_idx);
815  CHECK(exe_desc_ptr);
816  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
817 
818  size_t shard_count{0};
819  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
820  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
821  };
822 
823  if (sort) {
825  auto order_entries = sort->getOrderEntries();
826  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
827  shard_count =
828  GroupByAndAggregate::shard_count_for_top_groups(source_work_unit.exe_unit);
829  if (!shard_count) {
830  // No point in sorting on the leaf, only execute the input to the sort node.
831  CHECK_EQ(size_t(1), sort->inputCount());
832  const auto source = sort->getInput(0);
833  if (sort->collationCount() || node_is_aggregate(source)) {
834  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
835  CHECK_EQ(temp_seq.size(), size_t(1));
836  ExecutionOptions eo_copy = eo;
837  eo_copy.just_validate = eo.just_validate || sort->isEmptyResult();
838  // Use subseq to avoid clearing existing temporary tables
839  return {
840  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
841  merge_type(source),
842  source->getId(),
843  false};
844  }
845  }
846  }
847  QueryStepExecutionResult query_step_result{ExecutionResult{},
848  merge_type(exe_desc_ptr->getBody()),
849  exe_desc_ptr->getBody()->getId(),
850  false};
851  try {
852  query_step_result.result = executeRelAlgSubSeq(
853  seq, std::make_pair(step_idx, step_idx + 1), co, eo, render_info, queue_time_ms_);
854  } catch (QueryMustRunOnCpu const& e) {
856  auto copied_co = co;
858  LOG(INFO) << "Retry the query via CPU mode";
859  query_step_result.result = executeRelAlgSubSeq(seq,
860  std::make_pair(step_idx, step_idx + 1),
861  copied_co,
862  eo,
863  render_info,
865  }
867  VLOG(1) << "Running post execution callback.";
868  (*post_execution_callback_)();
869  }
870  return query_step_result;
871 }
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::optional< std::function< void()> > post_execution_callback_
RaExecutionDesc * getDescriptor(size_t idx) const
#define LOG(tag)
Definition: Logger.h:285
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
void check_sort_node_source_constraint(const RelSort *sort)
#define INJECT_TIMER(DESC)
Definition: measure.h:122
MergeType
A container for relational algebra descriptors defining the execution order for a relational algebra ...
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
ExecutorDeviceType device_type
bool node_is_aggregate(const RelAlgNode *ra)
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit)
#define CHECK(condition)
Definition: Logger.h:291
#define VLOG(n)
Definition: Logger.h:388
WorkUnit createSortInputWorkUnit(const RelSort *, std::list< Analyzer::OrderEntry > &order_entries, const ExecutionOptions &eo)

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQueryWithFilterPushDown ( const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)

Definition at line 148 of file JoinFilterPushDown.cpp.

References CHECK, executeRelAlgSeq(), executor_, ExecutionOptions::find_push_down_candidates, getSubqueries(), ExecutionOptions::just_calcite_explain, run_benchmark_import::result, and RaExecutionSequence::size().

Referenced by executeRelAlgQueryNoRetry().

153  {
154  // we currently do not fully support filter push down with
155  // multi-step execution and/or with subqueries
156  // TODO(Saman): add proper caching to enable filter push down for all cases
157  const auto& subqueries = getSubqueries();
158  if (seq.size() > 1 || !subqueries.empty()) {
159  if (eo.just_calcite_explain) {
160  return ExecutionResult(std::vector<PushedDownFilterInfo>{},
162  }
163  auto eo_modified = eo;
164  eo_modified.find_push_down_candidates = false;
165  eo_modified.just_calcite_explain = false;
166 
167  // Dispatch the subqueries first
168  for (auto subquery : subqueries) {
169  // Execute the subquery and cache the result.
170  RelAlgExecutor ra_executor(executor_);
171  const auto subquery_ra = subquery->getRelAlg();
172  CHECK(subquery_ra);
173  RaExecutionSequence subquery_seq(subquery_ra, executor_);
174  auto result =
175  ra_executor.executeRelAlgSeq(subquery_seq, co, eo_modified, nullptr, 0);
176  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
177  }
178  return executeRelAlgSeq(seq, co, eo_modified, render_info, queue_time_ms);
179  }
180  // else
181  return executeRelAlgSeq(seq, co, eo, render_info, queue_time_ms);
182 }
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
A container for relational algebra descriptors defining the execution order for a relational algebra ...
#define CHECK(condition)
Definition: Logger.h:291
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgSeq ( const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const bool  with_existing_temp_tables = false 
)

Definition at line 890 of file RelAlgExecutor.cpp.

References addTemporaryTable(), CHECK, CHECK_GE, DEBUG_TIMER, CompilationOptions::device_type, RaExecutionSequence::empty(), executeRelAlgStep(), executor_, RenderInfo::forceNonInSitu(), g_allow_query_step_cpu_retry, g_allow_query_step_skipping, g_cluster, g_enable_interop, RaExecutionDesc::getBody(), RaExecutionSequence::getDescriptor(), RaExecutionSequence::getSkippedQueryStepCacheKeys(), GPU, RaExecutionSequence::hasQueryStepForUnion(), logger::INFO, INJECT_TIMER, ExecutionOptions::just_explain, ExecutionOptions::keep_result, left_deep_join_info_, LOG, CompilationOptions::makeCpuOnly(), now_, RaExecutionSequence::size(), gpu_enabled::swap(), target_exprs_owned_, temporary_tables_, and VLOG.

Referenced by executeRelAlgQueryNoRetry(), and executeRelAlgQueryWithFilterPushDown().

895  {
897  auto timer = DEBUG_TIMER(__func__);
898  if (!with_existing_temp_tables) {
900  }
903  executor_->temporary_tables_ = &temporary_tables_;
904 
905  time(&now_);
906  CHECK(!seq.empty());
907 
908  auto get_descriptor_count = [&seq, &eo]() -> size_t {
909  if (eo.just_explain) {
910  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
911  // run the logical values descriptor to generate the result set, then the next
912  // descriptor to generate the explain
913  CHECK_GE(seq.size(), size_t(2));
914  return 2;
915  } else {
916  return 1;
917  }
918  } else {
919  return seq.size();
920  }
921  };
922 
923  const auto exec_desc_count = get_descriptor_count();
924  auto eo_copied = eo;
925  if (seq.hasQueryStepForUnion()) {
926  // we currently do not support resultset recycling when an input query
927  // contains union (all) operation
928  eo_copied.keep_result = false;
929  }
930 
931  // we have to register resultset(s) of the skipped query step(s) as temporary table
932  // before executing the remaining query steps
933  // since they may be required during the query processing
934  // i.e., get metadata of the target expression from the skipped query step
936  for (const auto& kv : seq.getSkippedQueryStepCacheKeys()) {
937  const auto cached_res =
938  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(kv.second);
939  CHECK(cached_res);
940  addTemporaryTable(kv.first, cached_res);
941  }
942  }
943 
944  const auto num_steps = exec_desc_count - 1;
945  for (size_t i = 0; i < exec_desc_count; i++) {
946  VLOG(1) << "Executing query step " << i << " / " << num_steps;
947  try {
949  seq, i, co, eo_copied, (i == num_steps) ? render_info : nullptr, queue_time_ms);
950  } catch (const QueryMustRunOnCpu&) {
951  // Do not allow per-step retry if flag is off or in distributed mode
952  // TODO(todd): Determine if and when we can relax this restriction
953  // for distributed
956  throw;
957  }
958  LOG(INFO) << "Retrying current query step " << i << " / " << num_steps << " on CPU";
959  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
960  if (render_info && i == num_steps) {
961  // only render on the last step
962  render_info->forceNonInSitu();
963  }
964  executeRelAlgStep(seq,
965  i,
966  co_cpu,
967  eo_copied,
968  (i == num_steps) ? render_info : nullptr,
969  queue_time_ms);
970  } catch (const NativeExecutionError&) {
971  if (!g_enable_interop) {
972  throw;
973  }
974  auto eo_extern = eo_copied;
975  eo_extern.executor_type = ::ExecutorType::Extern;
976  auto exec_desc_ptr = seq.getDescriptor(i);
977  const auto body = exec_desc_ptr->getBody();
978  const auto compound = dynamic_cast<const RelCompound*>(body);
979  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
980  LOG(INFO) << "Also failed to run the query using interoperability";
981  throw;
982  }
984  seq, i, co, eo_extern, (i == num_steps) ? render_info : nullptr, queue_time_ms);
985  }
986  }
987 
988  return seq.getDescriptor(num_steps)->getResult();
989 }
RaExecutionDesc * getDescriptor(size_t idx) const
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
void forceNonInSitu()
Definition: RenderInfo.cpp:46
const bool hasQueryStepForUnion() const
#define LOG(tag)
Definition: Logger.h:285
bool g_allow_query_step_skipping
Definition: Execute.cpp:163
const std::unordered_map< int, QueryPlanHash > getSkippedQueryStepCacheKeys() const
TemporaryTables temporary_tables_
#define CHECK_GE(x, y)
Definition: Logger.h:306
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
bool g_enable_interop
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:122
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutorDeviceType device_type
std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_join_info_
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
const RelAlgNode * getBody() const
bool g_allow_query_step_cpu_retry
Definition: Execute.cpp:94
bool g_cluster
Executor * executor_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeRelAlgStep ( const RaExecutionSequence seq,
const size_t  step_idx,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 1154 of file RelAlgExecutor.cpp.

References addTemporaryTable(), QueryPlanDagExtractor::applyLimitClauseToCacheKey(), build_render_targets(), canUseResultsetCache(), CHECK, SortInfo::createFromSortNode(), DEBUG_TIMER, RelRexToStringConfig::defaults(), executeAggregate(), executeCompound(), executeDelete(), executeFilter(), executeLogicalValues(), executeModify(), executeProject(), executeSort(), executeTableFunction(), executeUnion(), executeUpdate(), executor_, logger::FATAL, g_cluster, g_skip_intermediate_count, RelAlgNode::getContextData(), RaExecutionSequence::getDescriptor(), RaExecutionSequence::getDescriptorByBodyId(), RelAlgNode::getId(), RelAlgNode::getInput(), getParsedQueryHint(), anonymous_namespace{RelAlgDag.cpp}::handle_query_hint(), handleNop(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), RelAlgNode::hasContextData(), RaExecutionSequence::hasQueryStepForUnion(), INJECT_TIMER, LOG, ExecutionOptions::outer_fragment_indices, setHasStepForUnion(), gpu_enabled::sort(), VLOG, and ExecutionOptions::with_watchdog.

Referenced by executeRelAlgSeq(), and executeRelAlgSubSeq().

1159  {
1161  auto timer = DEBUG_TIMER(__func__);
1162  auto exec_desc_ptr = seq.getDescriptor(step_idx);
1163  CHECK(exec_desc_ptr);
1164  auto& exec_desc = *exec_desc_ptr;
1165  const auto body = exec_desc.getBody();
1166  if (body->isNop()) {
1167  handleNop(exec_desc);
1168  return;
1169  }
1170  ExecutionOptions eo_copied = eo;
1171  CompilationOptions co_copied = co;
1172  eo_copied.with_watchdog =
1173  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body));
1174  eo_copied.outer_fragment_indices =
1175  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>();
1176 
1177  auto target_node = body;
1178  auto query_plan_dag_hash = body->getQueryPlanDagHash();
1179  if (auto sort_body = dynamic_cast<const RelSort*>(body)) {
1180  target_node = sort_body->getInput(0);
1182  target_node->getQueryPlanDagHash(), SortInfo::createFromSortNode(sort_body));
1183  } else {
1185  target_node->getQueryPlanDagHash(), SortInfo());
1186  }
1187  auto query_hints = getParsedQueryHint(target_node);
1188  if (query_hints) {
1189  handle_query_hint(*query_hints, eo_copied, co_copied);
1190  }
1191 
1193  if (canUseResultsetCache(eo, render_info) && has_valid_query_plan_dag(body)) {
1194  if (auto cached_resultset =
1195  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(
1196  query_plan_dag_hash)) {
1197  VLOG(1) << "recycle resultset of the root node " << body->getRelNodeDagId()
1198  << " from resultset cache";
1199  body->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
1200  if (render_info) {
1201  std::vector<std::shared_ptr<Analyzer::Expr>>& cached_target_exprs =
1202  executor_->getResultSetRecyclerHolder().getTargetExprs(query_plan_dag_hash);
1203  std::vector<Analyzer::Expr*> copied_target_exprs;
1204  for (const auto& expr : cached_target_exprs) {
1205  copied_target_exprs.push_back(expr.get());
1206  }
1208  *render_info, copied_target_exprs, cached_resultset->getTargetMetaInfo());
1209  }
1210  exec_desc.setResult({cached_resultset, cached_resultset->getTargetMetaInfo()});
1211  addTemporaryTable(-body->getId(), exec_desc.getResult().getDataPtr());
1212  return;
1213  }
1214  }
1215 
1216  const auto compound = dynamic_cast<const RelCompound*>(body);
1217  if (compound) {
1218  if (compound->isDeleteViaSelect()) {
1219  executeDelete(compound, co_copied, eo_copied, queue_time_ms);
1220  } else if (compound->isUpdateViaSelect()) {
1221  executeUpdate(compound, co_copied, eo_copied, queue_time_ms);
1222  } else {
1223  exec_desc.setResult(
1224  executeCompound(compound, co_copied, eo_copied, render_info, queue_time_ms));
1225  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
1226  << static_cast<int>(-compound->getId()) << ", ...)"
1227  << " exec_desc.getResult().getDataPtr()->rowCount()="
1228  << exec_desc.getResult().getDataPtr()->rowCount();
1229  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1230  return;
1231  }
1232  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
1233  }
1234  return;
1235  }
1236  const auto project = dynamic_cast<const RelProject*>(body);
1237  if (project) {
1238  if (project->isDeleteViaSelect()) {
1239  executeDelete(project, co_copied, eo_copied, queue_time_ms);
1240  } else if (project->isUpdateViaSelect()) {
1241  executeUpdate(project, co_copied, eo_copied, queue_time_ms);
1242  } else {
1243  std::optional<size_t> prev_count;
1244  // Disabling the intermediate count optimization in distributed, as the previous
1245  // execution descriptor will likely not hold the aggregated result.
1246  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
1247  // If the previous node produced a reliable count, skip the pre-flight count.
1248  RelAlgNode const* const prev_body = project->getInput(0);
1249  if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
1250  if (RaExecutionDesc const* const prev_exec_desc =
1251  prev_body->hasContextData()
1252  ? prev_body->getContextData()
1253  : seq.getDescriptorByBodyId(prev_body->getId(), step_idx - 1)) {
1254  const auto& prev_exe_result = prev_exec_desc->getResult();
1255  const auto prev_result = prev_exe_result.getRows();
1256  if (prev_result) {
1257  prev_count = prev_result->rowCount();
1258  VLOG(3) << "Setting output row count for projection node to previous node ("
1259  << prev_exec_desc->getBody()->toString(
1261  << ") to " << *prev_count;
1262  }
1263  }
1264  }
1265  }
1266  exec_desc.setResult(executeProject(
1267  project, co_copied, eo_copied, render_info, queue_time_ms, prev_count));
1268  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
1269  << static_cast<int>(-project->getId()) << ", ...)"
1270  << " exec_desc.getResult().getDataPtr()->rowCount()="
1271  << exec_desc.getResult().getDataPtr()->rowCount();
1272  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1273  return;
1274  }
1275  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
1276  }
1277  return;
1278  }
1279  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
1280  if (aggregate) {
1281  exec_desc.setResult(
1282  executeAggregate(aggregate, co_copied, eo_copied, render_info, queue_time_ms));
1283  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
1284  return;
1285  }
1286  const auto filter = dynamic_cast<const RelFilter*>(body);
1287  if (filter) {
1288  exec_desc.setResult(
1289  executeFilter(filter, co_copied, eo_copied, render_info, queue_time_ms));
1290  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
1291  return;
1292  }
1293  const auto sort = dynamic_cast<const RelSort*>(body);
1294  if (sort) {
1295  exec_desc.setResult(
1296  executeSort(sort, co_copied, eo_copied, render_info, queue_time_ms));
1297  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1298  return;
1299  }
1300  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
1301  return;
1302  }
1303  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
1304  if (logical_values) {
1305  exec_desc.setResult(executeLogicalValues(logical_values, eo_copied));
1306  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
1307  return;
1308  }
1309  const auto modify = dynamic_cast<const RelModify*>(body);
1310  if (modify) {
1311  exec_desc.setResult(executeModify(modify, eo_copied));
1312  return;
1313  }
1314  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
1315  if (logical_union) {
1316  exec_desc.setResult(executeUnion(
1317  logical_union, seq, co_copied, eo_copied, render_info, queue_time_ms));
1318  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
1319  return;
1320  }
1321  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
1322  if (table_func) {
1323  exec_desc.setResult(
1324  executeTableFunction(table_func, co_copied, eo_copied, queue_time_ms));
1325  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
1326  return;
1327  }
1328  LOG(FATAL) << "Unhandled body type: "
1329  << body->toString(RelRexToStringConfig::defaults());
1330 }
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
bool has_valid_query_plan_dag(const RelAlgNode *node)
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
const bool hasQueryStepForUnion() const
bool g_skip_intermediate_count
#define LOG(tag)
Definition: Logger.h:285
std::vector< size_t > outer_fragment_indices
void handle_query_hint(const std::vector< std::shared_ptr< RelAlgNode >> &nodes, RelAlgDag &rel_alg_dag) noexcept
Definition: RelAlgDag.cpp:1573
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
std::optional< RegisteredQueryHint > getParsedQueryHint(const RelAlgNode *node)
static SortInfo createFromSortNode(const RelSort *sort_node)
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
void handleNop(RaExecutionDesc &ed)
unsigned getId() const
Definition: RelAlgDag.h:869
#define INJECT_TIMER(DESC)
Definition: measure.h:122
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
const RaExecutionDesc * getContextData() const
Definition: RelAlgDag.h:873
ExecutionResult executeUnion(const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:78
bool canUseResultsetCache(const ExecutionOptions &eo, RenderInfo *render_info) const
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool g_cluster
void executeDelete(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptorByBodyId(unsigned const body_id, size_t const start_idx) const
Executor * executor_
bool hasContextData() const
Definition: RelAlgDag.h:871
#define VLOG(n)
Definition: Logger.h:388
void setHasStepForUnion(bool flag)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgSubSeq ( const RaExecutionSequence seq,
const std::pair< size_t, size_t >  interval,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)

Definition at line 991 of file RelAlgExecutor.cpp.

References CHECK, CompilationOptions::device_type, executeRelAlgStep(), executor_, RenderInfo::forceNonInSitu(), g_allow_query_step_cpu_retry, g_cluster, RaExecutionSequence::getDescriptor(), GPU, logger::INFO, INJECT_TIMER, left_deep_join_info_, LOG, CompilationOptions::makeCpuOnly(), now_, gpu_enabled::swap(), and temporary_tables_.

Referenced by executeRelAlgQuerySingleStep().

997  {
999  executor_->temporary_tables_ = &temporary_tables_;
1001  time(&now_);
1002  for (size_t i = interval.first; i < interval.second; i++) {
1003  // only render on the last step
1004  try {
1005  executeRelAlgStep(seq,
1006  i,
1007  co,
1008  eo,
1009  (i == interval.second - 1) ? render_info : nullptr,
1010  queue_time_ms);
1011  } catch (const QueryMustRunOnCpu&) {
1012  // Do not allow per-step retry if flag is off or in distributed mode
1013  // TODO(todd): Determine if and when we can relax this restriction
1014  // for distributed
1017  throw;
1018  }
1019  LOG(INFO) << "Retrying current query step " << i << " on CPU";
1020  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
1021  if (render_info && i == interval.second - 1) {
1022  render_info->forceNonInSitu();
1023  }
1024  executeRelAlgStep(seq,
1025  i,
1026  co_cpu,
1027  eo,
1028  (i == interval.second - 1) ? render_info : nullptr,
1029  queue_time_ms);
1030  }
1031  }
1032 
1033  return seq.getDescriptor(interval.second - 1)->getResult();
1034 }
RaExecutionDesc * getDescriptor(size_t idx) const
void forceNonInSitu()
Definition: RenderInfo.cpp:46
#define LOG(tag)
Definition: Logger.h:285
TemporaryTables temporary_tables_
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:122
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutorDeviceType device_type
std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_join_info_
#define CHECK(condition)
Definition: Logger.h:291
bool g_allow_query_step_cpu_retry
Definition: Execute.cpp:94
bool g_cluster
Executor * executor_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeSimpleInsert ( const Analyzer::Query insert_query,
Fragmenter_Namespace::InsertDataLoader inserter,
const Catalog_Namespace::SessionInfo session 
)

Definition at line 2868 of file RelAlgExecutor.cpp.

References append_datum(), DataBlockPtr::arraysPtr, CHECK, CHECK_EQ, checked_malloc(), Executor::clearExternalCaches(), Fragmenter_Namespace::InsertData::columnIds, ColumnDescriptor::columnType, CPU, Fragmenter_Namespace::InsertData::data, Fragmenter_Namespace::InsertData::databaseId, executor_, import_export::fill_missing_columns(), get_column_descriptor(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_elem_type(), Analyzer::Query::get_result_col_list(), Analyzer::Query::get_result_table_id(), SQLTypeInfo::get_size(), SQLTypeInfo::get_type(), Analyzer::Query::get_values_lists(), Catalog_Namespace::SessionInfo::getCatalog(), Fragmenter_Namespace::InsertDataLoader::getLeafCount(), Catalog_Namespace::Catalog::getMetadataForTable(), inline_fixed_encoding_null_val(), anonymous_namespace{RelAlgExecutor.cpp}::insert_one_dict_str(), Fragmenter_Namespace::InsertDataLoader::insertData(), is_null(), SQLTypeInfo::is_string(), kARRAY, kBIGINT, kBOOLEAN, kCAST, kCHAR, kDATE, kDECIMAL, kDOUBLE, kENCODING_DICT, kENCODING_NONE, kFLOAT, kINT, kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kNUMERIC, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, DataBlockPtr::numbersPtr, Fragmenter_Namespace::InsertData::numRows, anonymous_namespace{TypedDataAccessors.h}::put_null(), anonymous_namespace{TypedDataAccessors.h}::put_null_array(), DataBlockPtr::stringsPtr, Fragmenter_Namespace::InsertData::tableId, and to_string().

2871  {
2872  // Note: We currently obtain an executor for this method, but we do not need it.
2873  // Therefore, we skip the executor state setup in the regular execution path. In the
2874  // future, we will likely want to use the executor to evaluate expressions in the insert
2875  // statement.
2876 
2877  const auto& values_lists = query.get_values_lists();
2878  const int table_id = query.get_result_table_id();
2879  const auto& col_id_list = query.get_result_col_list();
2880  size_t rows_number = values_lists.size();
2881  size_t leaf_count = inserter.getLeafCount();
2882  const auto& catalog = session.getCatalog();
2883  const auto td = catalog.getMetadataForTable(table_id);
2884  CHECK(td);
2885  size_t rows_per_leaf = rows_number;
2886  if (td->nShards == 0) {
2887  rows_per_leaf =
2888  ceil(static_cast<double>(rows_number) / static_cast<double>(leaf_count));
2889  }
2890  auto max_number_of_rows_per_package =
2891  std::max(size_t(1), std::min(rows_per_leaf, size_t(64 * 1024)));
2892 
2893  std::vector<const ColumnDescriptor*> col_descriptors;
2894  std::vector<int> col_ids;
2895  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2896  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2897  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2898  std::unordered_map<int, int> sequential_ids;
2899 
2900  for (const int col_id : col_id_list) {
2901  const auto cd = get_column_descriptor({catalog.getDatabaseId(), table_id, col_id});
2902  const auto col_enc = cd->columnType.get_compression();
2903  if (cd->columnType.is_string()) {
2904  switch (col_enc) {
2905  case kENCODING_NONE: {
2906  auto it_ok =
2907  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2908  CHECK(it_ok.second);
2909  break;
2910  }
2911  case kENCODING_DICT: {
2912  const auto dd = catalog.getMetadataForDict(cd->columnType.get_comp_param());
2913  CHECK(dd);
2914  const auto it_ok = col_buffers.emplace(
2915  col_id,
2916  std::make_unique<uint8_t[]>(cd->columnType.get_size() *
2917  max_number_of_rows_per_package));
2918  CHECK(it_ok.second);
2919  break;
2920  }
2921  default:
2922  CHECK(false);
2923  }
2924  } else if (cd->columnType.is_geometry()) {
2925  auto it_ok =
2926  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2927  CHECK(it_ok.second);
2928  } else if (cd->columnType.is_array()) {
2929  auto it_ok =
2930  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2931  CHECK(it_ok.second);
2932  } else {
2933  const auto it_ok = col_buffers.emplace(
2934  col_id,
2935  std::unique_ptr<uint8_t[]>(new uint8_t[cd->columnType.get_logical_size() *
2936  max_number_of_rows_per_package]()));
2937  CHECK(it_ok.second);
2938  }
2939  col_descriptors.push_back(cd);
2940  sequential_ids[col_id] = col_ids.size();
2941  col_ids.push_back(col_id);
2942  }
2943 
2944  Executor::clearExternalCaches(true, td, catalog.getDatabaseId());
2945 
2946  size_t start_row = 0;
2947  size_t rows_left = rows_number;
2948  while (rows_left != 0) {
2949  // clear the buffers
2950  for (const auto& kv : col_buffers) {
2951  memset(kv.second.get(), 0, max_number_of_rows_per_package);
2952  }
2953  for (auto& kv : str_col_buffers) {
2954  kv.second.clear();
2955  }
2956  for (auto& kv : arr_col_buffers) {
2957  kv.second.clear();
2958  }
2959 
2960  auto package_size = std::min(rows_left, max_number_of_rows_per_package);
2961  // Note: if there will be use cases with batch inserts with lots of rows, it might be
2962  // more efficient to do the loops below column by column instead of row by row.
2963  // But for now I consider such a refactoring not worth investigating, as we have more
2964  // efficient ways to insert many rows anyway.
2965  for (size_t row_idx = 0; row_idx < package_size; ++row_idx) {
2966  const auto& values_list = values_lists[row_idx + start_row];
2967  for (size_t col_idx = 0; col_idx < col_descriptors.size(); ++col_idx) {
2968  CHECK(values_list.size() == col_descriptors.size());
2969  auto col_cv =
2970  dynamic_cast<const Analyzer::Constant*>(values_list[col_idx]->get_expr());
2971  if (!col_cv) {
2972  auto col_cast =
2973  dynamic_cast<const Analyzer::UOper*>(values_list[col_idx]->get_expr());
2974  CHECK(col_cast);
2975  CHECK_EQ(kCAST, col_cast->get_optype());
2976  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2977  }
2978  CHECK(col_cv);
2979  const auto cd = col_descriptors[col_idx];
2980  auto col_datum = col_cv->get_constval();
2981  auto col_type = cd->columnType.get_type();
2982  uint8_t* col_data_bytes{nullptr};
2983  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2984  (!cd->columnType.is_string() ||
2985  cd->columnType.get_compression() == kENCODING_DICT)) {
2986  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2987  CHECK(col_data_bytes_it != col_buffers.end());
2988  col_data_bytes = col_data_bytes_it->second.get();
2989  }
2990  switch (col_type) {
2991  case kBOOLEAN: {
2992  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2993  auto null_bool_val =
2994  col_datum.boolval == inline_fixed_encoding_null_val(cd->columnType);
2995  col_data[row_idx] = col_cv->get_is_null() || null_bool_val
2996  ? inline_fixed_encoding_null_val(cd->columnType)
2997  : (col_datum.boolval ? 1 : 0);
2998  break;
2999  }
3000  case kTINYINT: {
3001  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
3002  col_data[row_idx] = col_cv->get_is_null()
3003  ? inline_fixed_encoding_null_val(cd->columnType)
3004  : col_datum.tinyintval;
3005  break;
3006  }
3007  case kSMALLINT: {
3008  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
3009  col_data[row_idx] = col_cv->get_is_null()
3010  ? inline_fixed_encoding_null_val(cd->columnType)
3011  : col_datum.smallintval;
3012  break;
3013  }
3014  case kINT: {
3015  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
3016  col_data[row_idx] = col_cv->get_is_null()
3017  ? inline_fixed_encoding_null_val(cd->columnType)
3018  : col_datum.intval;
3019  break;
3020  }
3021  case kBIGINT:
3022  case kDECIMAL:
3023  case kNUMERIC: {
3024  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
3025  col_data[row_idx] = col_cv->get_is_null()
3026  ? inline_fixed_encoding_null_val(cd->columnType)
3027  : col_datum.bigintval;
3028  break;
3029  }
3030  case kFLOAT: {
3031  auto col_data = reinterpret_cast<float*>(col_data_bytes);
3032  col_data[row_idx] = col_datum.floatval;
3033  break;
3034  }
3035  case kDOUBLE: {
3036  auto col_data = reinterpret_cast<double*>(col_data_bytes);
3037  col_data[row_idx] = col_datum.doubleval;
3038  break;
3039  }
3040  case kTEXT:
3041  case kVARCHAR:
3042  case kCHAR: {
3043  switch (cd->columnType.get_compression()) {
3044  case kENCODING_NONE:
3045  str_col_buffers[col_ids[col_idx]].push_back(
3046  col_datum.stringval ? *col_datum.stringval : "");
3047  break;
3048  case kENCODING_DICT: {
3049  switch (cd->columnType.get_size()) {
3050  case 1:
3052  &reinterpret_cast<uint8_t*>(col_data_bytes)[row_idx],
3053  cd,
3054  col_cv,
3055  catalog);
3056  break;
3057  case 2:
3059  &reinterpret_cast<uint16_t*>(col_data_bytes)[row_idx],
3060  cd,
3061  col_cv,
3062  catalog);
3063  break;
3064  case 4:
3066  &reinterpret_cast<int32_t*>(col_data_bytes)[row_idx],
3067  cd,
3068  col_cv,
3069  catalog);
3070  break;
3071  default:
3072  CHECK(false);
3073  }
3074  break;
3075  }
3076  default:
3077  CHECK(false);
3078  }
3079  break;
3080  }
3081  case kTIME:
3082  case kTIMESTAMP:
3083  case kDATE: {
3084  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
3085  col_data[row_idx] = col_cv->get_is_null()
3086  ? inline_fixed_encoding_null_val(cd->columnType)
3087  : col_datum.bigintval;
3088  break;
3089  }
3090  case kARRAY: {
3091  const auto is_null = col_cv->get_is_null();
3092  const auto size = cd->columnType.get_size();
3093  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
3094  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
3095  const auto is_point_coords =
3096  (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
3097  if (is_null && !is_point_coords) {
3098  if (size > 0) {
3099  int8_t* buf = (int8_t*)checked_malloc(size);
3100  put_null_array(static_cast<void*>(buf), elem_ti, "");
3101  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
3102  p += elem_ti.get_size()) {
3103  put_null(static_cast<void*>(p), elem_ti, "");
3104  }
3105  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
3106  } else {
3107  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
3108  }
3109  break;
3110  }
3111  const auto l = col_cv->get_value_list();
3112  size_t len = l.size() * elem_ti.get_size();
3113  if (size > 0 && static_cast<size_t>(size) != len) {
3114  throw std::runtime_error("Array column " + cd->columnName + " expects " +
3115  std::to_string(size / elem_ti.get_size()) +
3116  " values, " + "received " +
3117  std::to_string(l.size()));
3118  }
3119  if (elem_ti.is_string()) {
3120  CHECK(kENCODING_DICT == elem_ti.get_compression());
3121  CHECK(4 == elem_ti.get_size());
3122 
3123  int8_t* buf = (int8_t*)checked_malloc(len);
3124  int32_t* p = reinterpret_cast<int32_t*>(buf);
3125 
3126  int elemIndex = 0;
3127  for (auto& e : l) {
3128  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3129  CHECK(c);
3131  &p[elemIndex], cd->columnName, elem_ti, c.get(), catalog);
3132  elemIndex++;
3133  }
3134  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3135 
3136  } else {
3137  int8_t* buf = (int8_t*)checked_malloc(len);
3138  int8_t* p = buf;
3139  for (auto& e : l) {
3140  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3141  CHECK(c);
3142  p = append_datum(p, c->get_constval(), elem_ti);
3143  CHECK(p);
3144  }
3145  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3146  }
3147  break;
3148  }
3149  case kPOINT:
3150  case kMULTIPOINT:
3151  case kLINESTRING:
3152  case kMULTILINESTRING:
3153  case kPOLYGON:
3154  case kMULTIPOLYGON:
3155  if (col_datum.stringval && col_datum.stringval->empty()) {
3156  throw std::runtime_error(
3157  "Empty values are not allowed for geospatial column \"" +
3158  cd->columnName + "\"");
3159  } else {
3160  str_col_buffers[col_ids[col_idx]].push_back(
3161  col_datum.stringval ? *col_datum.stringval : "");
3162  }
3163  break;
3164  default:
3165  CHECK(false);
3166  }
3167  }
3168  }
3169  start_row += package_size;
3170  rows_left -= package_size;
3171 
3173  insert_data.databaseId = catalog.getCurrentDB().dbId;
3174  insert_data.tableId = table_id;
3175  insert_data.data.resize(col_ids.size());
3176  insert_data.columnIds = col_ids;
3177  for (const auto& kv : col_buffers) {
3178  DataBlockPtr p;
3179  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
3180  insert_data.data[sequential_ids[kv.first]] = p;
3181  }
3182  for (auto& kv : str_col_buffers) {
3183  DataBlockPtr p;
3184  p.stringsPtr = &kv.second;
3185  insert_data.data[sequential_ids[kv.first]] = p;
3186  }
3187  for (auto& kv : arr_col_buffers) {
3188  DataBlockPtr p;
3189  p.arraysPtr = &kv.second;
3190  insert_data.data[sequential_ids[kv.first]] = p;
3191  }
3192  insert_data.numRows = package_size;
3193  auto data_memory_holder = import_export::fill_missing_columns(&catalog, insert_data);
3194  inserter.insertData(session, insert_data);
3195  }
3196 
3197  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
3200  executor_->getRowSetMemoryOwner(),
3201  0,
3202  0);
3203  std::vector<TargetMetaInfo> empty_targets;
3204  return {rs, empty_targets};
3205 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
int8_t * append_datum(int8_t *buf, const Datum &d, const SQLTypeInfo &ti)
Definition: Datum.cpp:580
Definition: sqltypes.h:76
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:234
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:235
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:6217
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
Definition: sqldefs.h:51
std::vector< TargetInfo > TargetInfoList
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
void insertData(const Catalog_Namespace::SessionInfo &session_info, InsertData &insert_data)
std::string to_string(char const *&&v)
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:229
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
CONSTEXPR DEVICE bool is_null(const T &value)
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
const ColumnDescriptor * get_column_descriptor(const shared::ColumnKey &column_key)
Definition: Execute.h:213
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
Definition: sqltypes.h:79
Definition: sqltypes.h:80
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:73
Catalog & getCatalog() const
Definition: SessionInfo.h:75
Definition: sqltypes.h:68
#define CHECK(condition)
Definition: Logger.h:291
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:438
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
Definition: sqltypes.h:72
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_string() const
Definition: sqltypes.h:561
int8_t * numbersPtr
Definition: sqltypes.h:233
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:977
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
Executor * executor_

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeSort ( const RelSort sort,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 3232 of file RelAlgExecutor.cpp.

References SpeculativeTopNBlacklist::add(), addTemporaryTable(), anonymous_namespace{QueryMemoryDescriptor.cpp}::any_of(), QueryPlanDagExtractor::applyLimitClauseToCacheKey(), build_render_targets(), canUseResultsetCache(), CHECK, CHECK_EQ, anonymous_namespace{RelAlgExecutor.cpp}::check_sort_node_source_constraint(), RelSort::collationCount(), CPU, createSortInputWorkUnit(), DEBUG_TIMER, CompilationOptions::device_type, executeWorkUnit(), executor_, anonymous_namespace{RelAlgExecutor.cpp}::first_oe_is_desc(), g_cluster, anonymous_namespace{RelAlgExecutor.cpp}::get_limit_value(), ExecutionResult::getDataPtr(), RelAlgNode::getId(), RelAlgNode::getInput(), RelSort::getLimit(), RelSort::getOffset(), RelSort::getOrderEntries(), GPU, anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), hasStepForUnion(), anonymous_namespace{RelAlgExecutor.cpp}::is_none_encoded_text(), RelSort::isEmptyResult(), ExecutionOptions::just_validate, leaf_results_, anonymous_namespace{RelAlgExecutor.cpp}::node_is_aggregate(), run_benchmark_import::result, RelAlgNode::setOutputMetainfo(), gpu_enabled::sort(), speculative_topn_blacklist_, SpeculativeTopN, temporary_tables_, use_speculative_top_n(), and VLOG.

Referenced by executeRelAlgStep().

3236  {
3237  auto timer = DEBUG_TIMER(__func__);
3239  const auto source = sort->getInput(0);
3240  const bool is_aggregate = node_is_aggregate(source);
3241  auto it = leaf_results_.find(sort->getId());
3242  auto order_entries = sort->getOrderEntries();
3243  if (it != leaf_results_.end()) {
3244  // Add any transient string literals to the sdp on the agg
3245  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3246  executor_->addTransientStringLiterals(source_work_unit.exe_unit,
3247  executor_->row_set_mem_owner_);
3248  // Handle push-down for LIMIT for multi-node
3249  auto& aggregated_result = it->second;
3250  auto& result_rows = aggregated_result.rs;
3251  auto limit = sort->getLimit();
3252  const size_t offset = sort->getOffset();
3253  if (limit || offset) {
3254  if (!order_entries.empty()) {
3255  result_rows->sort(
3256  order_entries, get_limit_value(limit) + offset, co.device_type, executor_);
3257  }
3258  result_rows->dropFirstN(offset);
3259  if (limit) {
3260  result_rows->keepFirstN(get_limit_value(limit));
3261  }
3262  }
3263 
3264  if (render_info) {
3265  // We've hit a sort step that is the very last step
3266  // in a distributed render query. We'll fill in the render targets
3267  // since we have all that data needed to do so. This is normally
3268  // done in executeWorkUnit, but that is bypassed in this case.
3269  build_render_targets(*render_info,
3270  source_work_unit.exe_unit.target_exprs,
3271  aggregated_result.targets_meta);
3272  }
3273 
3274  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3275  sort->setOutputMetainfo(aggregated_result.targets_meta);
3276 
3277  return result;
3278  }
3279 
3280  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
3281  bool is_desc{false};
3282  bool use_speculative_top_n_sort{false};
3283 
3284  auto execute_sort_query = [this,
3285  sort,
3286  &source,
3287  &is_aggregate,
3288  &eo,
3289  &co,
3290  render_info,
3291  queue_time_ms,
3292  &groupby_exprs,
3293  &is_desc,
3294  &order_entries,
3295  &use_speculative_top_n_sort]() -> ExecutionResult {
3296  std::optional<size_t> limit = sort->getLimit();
3297  const size_t offset = sort->getOffset();
3298  // check whether sort's input is cached
3299  auto source_node = sort->getInput(0);
3300  CHECK(source_node);
3301  ExecutionResult source_result{nullptr, {}};
3303  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
3304  auto source_query_plan_dag = QueryPlanDagExtractor::applyLimitClauseToCacheKey(
3305  source_node->getQueryPlanDagHash(), sort_info);
3306  if (canUseResultsetCache(eo, render_info) && has_valid_query_plan_dag(source_node) &&
3307  !sort->isEmptyResult()) {
3308  if (auto cached_resultset =
3309  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(
3310  source_query_plan_dag)) {
3311  CHECK(cached_resultset->canUseSpeculativeTopNSort());
3312  VLOG(1) << "recycle resultset of the root node " << source_node->getRelNodeDagId()
3313  << " from resultset cache";
3314  source_result =
3315  ExecutionResult{cached_resultset, cached_resultset->getTargetMetaInfo()};
3316  if (temporary_tables_.find(-source_node->getId()) == temporary_tables_.end()) {
3317  addTemporaryTable(-source_node->getId(), cached_resultset);
3318  }
3319  use_speculative_top_n_sort = *cached_resultset->canUseSpeculativeTopNSort() &&
3320  co.device_type == ExecutorDeviceType::GPU;
3321  source_node->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
3322  sort->setOutputMetainfo(source_node->getOutputMetainfo());
3323  }
3324  }
3325  if (!source_result.getDataPtr()) {
3326  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3327  is_desc = first_oe_is_desc(order_entries);
3328  ExecutionOptions eo_copy = eo;
3329  CompilationOptions co_copy = co;
3330  eo_copy.just_validate = eo.just_validate || sort->isEmptyResult();
3331  if (hasStepForUnion() &&
3332  boost::algorithm::any_of(source->getOutputMetainfo(), is_none_encoded_text)) {
3334  VLOG(1) << "Punt sort's input query to CPU: detect union(-all) of none-encoded "
3335  "text column";
3336  }
3337 
3338  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
3339  source_result = executeWorkUnit(source_work_unit,
3340  source->getOutputMetainfo(),
3341  is_aggregate,
3342  co_copy,
3343  eo_copy,
3344  render_info,
3345  queue_time_ms);
3346  use_speculative_top_n_sort =
3347  source_result.getDataPtr() && source_result.getRows()->hasValidBuffer() &&
3348  use_speculative_top_n(source_work_unit.exe_unit,
3349  source_result.getRows()->getQueryMemDesc());
3350  }
3351  if (render_info && render_info->isInSitu()) {
3352  return source_result;
3353  }
3354  if (source_result.isFilterPushDownEnabled()) {
3355  return source_result;
3356  }
3357  auto rows_to_sort = source_result.getRows();
3358  if (eo.just_explain) {
3359  return {rows_to_sort, {}};
3360  }
3361  auto const limit_val = get_limit_value(limit);
3362  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
3363  !use_speculative_top_n_sort) {
3364  const size_t top_n = limit_val + offset;
3365  rows_to_sort->sort(order_entries, top_n, co.device_type, executor_);
3366  }
3367  if (limit || offset) {
3368  if (g_cluster && sort->collationCount() == 0) {
3369  if (offset >= rows_to_sort->rowCount()) {
3370  rows_to_sort->dropFirstN(offset);
3371  } else {
3372  rows_to_sort->keepFirstN(limit_val + offset);
3373  }
3374  } else {
3375  rows_to_sort->dropFirstN(offset);
3376  if (limit) {
3377  rows_to_sort->keepFirstN(limit_val);
3378  }
3379  }
3380  }
3381  return {rows_to_sort, source_result.getTargetsMeta()};
3382  };
3383 
3384  try {
3385  return execute_sort_query();
3386  } catch (const SpeculativeTopNFailed& e) {
3387  CHECK_EQ(size_t(1), groupby_exprs.size());
3388  CHECK(groupby_exprs.front());
3389  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
3390  return execute_sort_query();
3391  }
3392 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
#define CHECK_EQ(x, y)
Definition: Logger.h:301
size_t getOffset() const
Definition: RelAlgDag.h:2228
bool has_valid_query_plan_dag(const RelAlgNode *node)
size_t get_limit_value(std::optional< size_t > limit)
std::list< Analyzer::OrderEntry > getOrderEntries() const
Definition: RelAlgDag.h:2264
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
bool is_none_encoded_text(TargetMetaInfo const &target_meta_info)
static SpeculativeTopNBlacklist speculative_topn_blacklist_
TemporaryTables temporary_tables_
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
const ResultSetPtr & getDataPtr() const
bool hasStepForUnion() const
unsigned getId() const
Definition: RelAlgDag.h:869
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
ExecutorDeviceType device_type
bool node_is_aggregate(const RelAlgNode *ra)
bool isEmptyResult() const
Definition: RelAlgDag.h:2222
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
size_t collationCount() const
Definition: RelAlgDag.h:2211
bool canUseResultsetCache(const ExecutionOptions &eo, RenderInfo *render_info) const
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
bool g_cluster
void add(const std::shared_ptr< Analyzer::Expr > expr, const bool desc)
bool any_of(std::vector< Analyzer::Expr * > const &target_exprs)
std::optional< size_t > getLimit() const
Definition: RelAlgDag.h:2226
Executor * executor_
#define VLOG(n)
Definition: Logger.h:388
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)
WorkUnit createSortInputWorkUnit(const RelSort *, std::list< Analyzer::OrderEntry > &order_entries, const ExecutionOptions &eo)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeTableFunction ( const RelTableFunction table_func,
const CompilationOptions co_in,
const ExecutionOptions eo,
const int64_t  queue_time_ms 
)
private

Definition at line 2363 of file RelAlgExecutor.cpp.

References addTemporaryTable(), canUseResultsetCache(), CHECK, createTableFunctionWorkUnit(), DEBUG_TIMER, executor_, g_allow_auto_resultset_caching, g_auto_resultset_caching_threshold, g_cluster, g_enable_table_functions, get_table_infos(), QueryExecutionError::getErrorCode(), getGlobalQueryHint(), RelAlgNode::getQueryPlanDagHash(), RelAlgNode::getRelNodeDagId(), ScanNodeTableKeyCollector::getScanNodeTableKey(), GPU, handlePersistentError(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), QueryExecutionError::hasErrorCode(), hasStepForUnion(), INJECT_TIMER, is_validate_or_explain_query(), ExecutionOptions::just_explain, ExecutionOptions::keep_result, kKeepTableFuncResult, run_benchmark_import::result, target_exprs_owned_, timer_start(), timer_stop(), and VLOG.

Referenced by executeRelAlgStep().

2366  {
2368  auto timer = DEBUG_TIMER(__func__);
2369 
2370  auto co = co_in;
2371 
2372  if (g_cluster) {
2373  throw std::runtime_error("Table functions not supported in distributed mode yet");
2374  }
2375  if (!g_enable_table_functions) {
2376  throw std::runtime_error("Table function support is disabled");
2377  }
2378  auto table_func_work_unit = createTableFunctionWorkUnit(
2379  table_func,
2380  eo.just_explain,
2381  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
2382  const auto body = table_func_work_unit.body;
2383  CHECK(body);
2384 
2385  const auto table_infos =
2386  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
2387 
2388  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2389  co.device_type,
2391  nullptr,
2392  executor_->blockSize(),
2393  executor_->gridSize()),
2394  {}};
2395 
2396  auto const global_hint = getGlobalQueryHint();
2397  auto const use_resultset_recycler =
2398  canUseResultsetCache(eo, nullptr) && has_valid_query_plan_dag(table_func);
2399  if (use_resultset_recycler) {
2400  auto cached_resultset =
2401  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(
2402  table_func->getQueryPlanDagHash());
2403  if (cached_resultset) {
2404  VLOG(1) << "recycle table function's resultset of the root node "
2405  << table_func->getRelNodeDagId() << " from resultset cache";
2406  result = {cached_resultset, cached_resultset->getTargetMetaInfo()};
2407  addTemporaryTable(-body->getId(), result.getDataPtr());
2408  return result;
2409  }
2410  }
2411 
2412  auto query_exec_time_begin = timer_start();
2413  try {
2414  result = {executor_->executeTableFunction(
2415  table_func_work_unit.exe_unit, table_infos, co, eo),
2416  body->getOutputMetainfo()};
2417  } catch (const QueryExecutionError& e) {
2419  CHECK(e.hasErrorCode(ErrorCode::OUT_OF_GPU_MEM)) << e.getErrorCode();
2420  throw std::runtime_error("Table function ran out of memory during execution");
2421  }
2422  auto query_exec_time = timer_stop(query_exec_time_begin);
2423  result.setQueueTime(queue_time_ms);
2424  auto resultset_ptr = result.getDataPtr();
2425  if (use_resultset_recycler) {
2426  resultset_ptr->setExecTime(query_exec_time);
2427  resultset_ptr->setQueryPlanHash(table_func_work_unit.exe_unit.query_plan_dag_hash);
2428  resultset_ptr->setTargetMetaInfo(body->getOutputMetainfo());
2429  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
2430  resultset_ptr->setInputTableKeys(std::move(input_table_keys));
2431  auto allow_auto_caching_resultset =
2432  resultset_ptr && resultset_ptr->hasValidBuffer() &&
2434  resultset_ptr->getBufferSizeBytes(co.device_type) <=
2436  if (global_hint->isHintRegistered(QueryHint::kKeepTableFuncResult) ||
2437  allow_auto_caching_resultset) {
2438  if (allow_auto_caching_resultset) {
2439  VLOG(1) << "Automatically keep table function's query resultset to recycler";
2440  }
2441  executor_->getResultSetRecyclerHolder().putQueryResultSetToCache(
2442  table_func_work_unit.exe_unit.query_plan_dag_hash,
2443  resultset_ptr->getInputTableKeys(),
2444  resultset_ptr,
2445  resultset_ptr->getBufferSizeBytes(co.device_type),
2447  }
2448  } else {
2449  if (eo.keep_result) {
2450  if (g_cluster) {
2451  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since we do not "
2452  "support resultset recycling on distributed mode";
2453  } else if (hasStepForUnion()) {
2454  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2455  "has union-(all) operator";
2456  } else if (is_validate_or_explain_query(eo)) {
2457  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2458  "is either validate or explain query";
2459  } else {
2460  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored";
2461  }
2462  }
2463  }
2464 
2465  return result;
2466 }
int32_t getErrorCode() const
Definition: ErrorHandling.h:63
bool has_valid_query_plan_dag(const RelAlgNode *node)
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
bool is_validate_or_explain_query(const ExecutionOptions &eo)
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
static std::unordered_set< size_t > getScanNodeTableKey(RelAlgNode const *rel_alg_node)
bool hasStepForUnion() const
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:863
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
#define INJECT_TIMER(DESC)
Definition: measure.h:122
bool hasErrorCode(ErrorCode const ec) const
Definition: ErrorHandling.h:65
bool g_allow_auto_resultset_caching
Definition: Execute.cpp:162
std::optional< RegisteredQueryHint > getGlobalQueryHint()
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
bool canUseResultsetCache(const ExecutionOptions &eo, RenderInfo *render_info) const
static void handlePersistentError(const int32_t error_code)
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:412
bool g_cluster
size_t getRelNodeDagId() const
Definition: RelAlgDag.h:921
Executor * executor_
#define VLOG(n)
Definition: Logger.h:388
Type timer_start()
Definition: measure.h:42
size_t g_auto_resultset_caching_threshold
Definition: Execute.cpp:168
bool g_enable_table_functions
Definition: Execute.cpp:121

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeUnion ( const RelLogicalUnion logical_union,
const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2735 of file RelAlgExecutor.cpp.

References anonymous_namespace{QueryMemoryDescriptor.cpp}::any_of(), createUnionWorkUnit(), DEBUG_TIMER, executeWorkUnit(), RelLogicalUnion::getCompatibleMetainfoTypes(), RelAlgNode::getOutputMetainfo(), RelLogicalUnion::isAll(), isGeometry(), CompilationOptions::makeCpuOnly(), and RelAlgNode::setOutputMetainfo().

Referenced by executeRelAlgStep().

2740  {
2741  auto timer = DEBUG_TIMER(__func__);
2742  if (!logical_union->isAll()) {
2743  throw std::runtime_error("UNION without ALL is not supported yet.");
2744  }
2745  // Will throw a std::runtime_error if types don't match.
2746  logical_union->setOutputMetainfo(logical_union->getCompatibleMetainfoTypes());
2747  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2748  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2749  }
2750  auto work_unit = createUnionWorkUnit(logical_union, SortInfo(), eo);
2751  return executeWorkUnit(work_unit,
2752  logical_union->getOutputMetainfo(),
2753  false,
2755  eo,
2756  render_info,
2757  queue_time_ms);
2758 }
bool isAll() const
Definition: RelAlgDag.h:2770
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool isGeometry(TargetMetaInfo const &target_meta_info)
WorkUnit createUnionWorkUnit(const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
std::vector< TargetMetaInfo > getCompatibleMetainfoTypes() const
Definition: RelAlgDag.cpp:911
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define DEBUG_TIMER(name)
Definition: Logger.h:412
bool any_of(std::vector< Analyzer::Expr * > const &target_exprs)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:865

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeUpdate ( const RelAlgNode node,
const CompilationOptions co,
const ExecutionOptions eo,
const int64_t  queue_time_ms 
)
private

Definition at line 2022 of file RelAlgExecutor.cpp.

References CompilationOptions::allow_lazy_fetch, CHECK, CHECK_EQ, Executor::clearExternalCaches(), computeWindow(), CPU, createCompoundWorkUnit(), createProjectWorkUnit(), DEBUG_TIMER, RelRexToStringConfig::defaults(), dml_transaction_parameters_, executor_, CompilationOptions::filter_on_deleted_column, get_table_infos(), get_temporary_table(), QueryExecutionError::getErrorCode(), getErrorMessageFromCode(), CompilationOptions::hoist_literals, leaf_results_, CompilationOptions::makeCpuOnly(), post_execution_callback_, StorageIOFacility::TransactionParameters::setInputSourceNode(), temporary_tables_, and StorageIOFacility::yieldUpdateCallback().

Referenced by executeRelAlgStep().

2025  {
2026  CHECK(node);
2027  auto timer = DEBUG_TIMER(__func__);
2028 
2029  auto co = co_in;
2030  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
2031  // encoded string updates
2032 
2033  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
2034  auto& work_unit,
2035  const bool is_aggregate) {
2036  auto table_descriptor = node->getModifiedTableDescriptor();
2037  CHECK(table_descriptor);
2038  if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
2039  throw std::runtime_error(
2040  "UPDATE queries involving variable length columns are only supported on tables "
2041  "with the vacuum attribute set to 'delayed'");
2042  }
2043 
2044  auto catalog = node->getModifiedTableCatalog();
2045  CHECK(catalog);
2046  Executor::clearExternalCaches(true, table_descriptor, catalog->getDatabaseId());
2047 
2049  std::make_unique<UpdateTransactionParameters>(table_descriptor,
2050  *catalog,
2051  node->getTargetColumns(),
2052  node->getOutputMetainfo(),
2053  node->isVarlenUpdateRequired());
2054 
2055  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2056 
2057  auto execute_update_ra_exe_unit =
2058  [this, &co, &eo_in, &table_infos, &table_descriptor, &node, catalog](
2059  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
2061 
2062  auto eo = eo_in;
2063  if (dml_transaction_parameters_->tableIsTemporary()) {
2064  eo.output_columnar_hint = true;
2065  co_project.allow_lazy_fetch = false;
2066  co_project.filter_on_deleted_column =
2067  false; // project the entire delete column for columnar update
2068  }
2069 
2070  auto update_transaction_parameters = dynamic_cast<UpdateTransactionParameters*>(
2071  dml_transaction_parameters_.get());
2072  update_transaction_parameters->setInputSourceNode(node);
2073  CHECK(update_transaction_parameters);
2074  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
2075  try {
2076  auto table_update_metadata =
2077  executor_->executeUpdate(ra_exe_unit,
2078  table_infos,
2079  table_descriptor,
2080  co_project,
2081  eo,
2082  *catalog,
2083  executor_->row_set_mem_owner_,
2084  update_callback,
2085  is_aggregate);
2086  post_execution_callback_ = [table_update_metadata, this, catalog]() {
2087  dml_transaction_parameters_->finalizeTransaction(*catalog);
2088  TableOptimizer table_optimizer{
2089  dml_transaction_parameters_->getTableDescriptor(), executor_, *catalog};
2090  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2091  };
2092  } catch (const QueryExecutionError& e) {
2093  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2094  }
2095  };
2096 
2097  if (dml_transaction_parameters_->tableIsTemporary()) {
2098  // hold owned target exprs during execution if rewriting
2099  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2100  // rewrite temp table updates to generate the full column by moving the where
2101  // clause into a case if such a rewrite is not possible, bail on the update
2102  // operation build an expr for the update target
2103  auto update_transaction_params =
2104  dynamic_cast<UpdateTransactionParameters*>(dml_transaction_parameters_.get());
2105  CHECK(update_transaction_params);
2106  const auto td = update_transaction_params->getTableDescriptor();
2107  CHECK(td);
2108  const auto update_column_names = update_transaction_params->getUpdateColumnNames();
2109  if (update_column_names.size() > 1) {
2110  throw std::runtime_error(
2111  "Multi-column update is not yet supported for temporary tables.");
2112  }
2113 
2114  const auto cd =
2115  catalog->getMetadataForColumn(td->tableId, update_column_names.front());
2116  CHECK(cd);
2117  auto projected_column_to_update = makeExpr<Analyzer::ColumnVar>(
2118  cd->columnType,
2119  shared::ColumnKey{catalog->getDatabaseId(), td->tableId, cd->columnId},
2120  0);
2121  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
2122  work_unit.exe_unit, projected_column_to_update);
2123  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
2124  throw std::runtime_error(
2125  "Variable length updates not yet supported on temporary tables.");
2126  }
2127  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2128  } else {
2129  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2130  }
2131  };
2132 
2133  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2134  auto work_unit = createCompoundWorkUnit(compound, SortInfo(), eo_in);
2135 
2136  execute_update_for_node(compound, work_unit, compound->isAggregate());
2137  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2138  auto work_unit = createProjectWorkUnit(project, SortInfo(), eo_in);
2139 
2140  if (project->isSimple()) {
2141  CHECK_EQ(size_t(1), project->inputCount());
2142  const auto input_ra = project->getInput(0);
2143  if (dynamic_cast<const RelSort*>(input_ra)) {
2144  const auto& input_table =
2145  get_temporary_table(&temporary_tables_, -input_ra->getId());
2146  CHECK(input_table);
2147  work_unit.exe_unit.scan_limit = input_table->rowCount();
2148  }
2149  }
2150  if (project->hasWindowFunctionExpr() || project->hasPushedDownWindowExpr()) {
2151  // the first condition means this project node has at least one window function
2152  // and the second condition indicates that this project node falls into
2153  // one of the following cases:
2154  // 1) window function expression on a multi-fragmented table
2155  // 2) window function expression is too complex to evaluate without codegen:
2156  // i.e., sum(x+y+z) instead of sum(x) -> we currently do not support codegen to
2157  // evaluate such a complex window function expression
2158  // 3) nested window function expression
2159  // but currently we do not support update on a multi-fragmented table having
2160  // window function, so the second condition only refers to non-fragmented table with
2161  // cases 2) or 3)
2162  // if at least one of two conditions satisfy, we must compute corresponding window
2163  // context before entering `execute_update_for_node` to properly update the table
2164  if (!leaf_results_.empty()) {
2165  throw std::runtime_error(
2166  "Update query having window function is not yet supported in distributed "
2167  "mode.");
2168  }
2169  ColumnCacheMap column_cache;
2171  computeWindow(work_unit, co, eo_in, column_cache, queue_time_ms);
2172  }
2173  execute_update_for_node(project, work_unit, false);
2174  } else {
2175  throw std::runtime_error("Unsupported parent node for update: " +
2176  node->toString(RelRexToStringConfig::defaults()));
2177  }
2178 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::optional< std::function< void()> > post_execution_callback_
int32_t getErrorCode() const
Definition: ErrorHandling.h:63
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
TemporaryTables temporary_tables_
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
std::unordered_map< unsigned, AggregatedResult > leaf_results_
void computeWindow(const WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)