OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutor Class Reference

#include <RelAlgExecutor.h>

+ Inheritance diagram for RelAlgExecutor:
+ Collaboration diagram for RelAlgExecutor:

Classes

struct  TableFunctionWorkUnit
 
struct  WorkUnit
 

Public Types

using TargetInfoList = std::vector< TargetInfo >
 

Public Member Functions

 RelAlgExecutor (Executor *executor, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
 RelAlgExecutor (Executor *executor, const std::string &query_ra, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
 RelAlgExecutor (Executor *executor, std::unique_ptr< RelAlgDag > query_dag, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
size_t getOuterFragmentCount (const CompilationOptions &co, const ExecutionOptions &eo)
 
ExecutionResult executeRelAlgQuery (const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, const bool explain_verbose, RenderInfo *render_info)
 
ExecutionResult executeRelAlgQueryWithFilterPushDown (const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
void prepareLeafExecution (const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
 
ExecutionResult executeRelAlgSeq (const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
 
ExecutionResult executeRelAlgSubSeq (const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
QueryStepExecutionResult executeRelAlgQuerySingleStep (const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
 
void addLeafResult (const unsigned id, const AggregatedResult &result)
 
std::unique_ptr< RelAlgDaggetOwnedRelAlgDag ()
 
RelAlgDaggetRelAlgDag ()
 
const RelAlgNodegetRootRelAlgNode () const
 
void prepareForeignTables ()
 
std::shared_ptr< const RelAlgNodegetRootRelAlgNodeShPtr () const
 
std::pair< std::vector
< unsigned >
, std::unordered_map< unsigned,
JoinQualsPerNestingLevel > > 
getJoinInfo (const RelAlgNode *root_node)
 
std::shared_ptr< RelAlgTranslatorgetRelAlgTranslator (const RelAlgNode *root_node)
 
const std::vector
< std::shared_ptr< RexSubQuery > > & 
getSubqueries () const noexcept
 
std::optional
< RegisteredQueryHint
getParsedQueryHint (const RelAlgNode *node)
 
std::optional
< std::unordered_map< size_t,
std::unordered_map< unsigned,
RegisteredQueryHint > > > 
getParsedQueryHints ()
 
std::optional
< RegisteredQueryHint
getGlobalQueryHint ()
 
ExecutionResult executeSimpleInsert (const Analyzer::Query &insert_query, Fragmenter_Namespace::InsertDataLoader &inserter, const Catalog_Namespace::SessionInfo &session)
 
AggregatedColRange computeColRangesCache ()
 
StringDictionaryGenerations computeStringDictionaryGenerations ()
 
TableGenerations computeTableGenerations ()
 
ExecutorgetExecutor () const
 
void cleanupPostExecution ()
 
void executePostExecutionCallback ()
 
RaExecutionSequence getRaExecutionSequence (const RelAlgNode *root_node, Executor *executor)
 
void prepareForeignTable ()
 
std::unordered_set
< shared::TableKey
getPhysicalTableIds () const
 
void prepareForSystemTableExecution (const CompilationOptions &co) const
 

Static Public Member Functions

static std::string getErrorMessageFromCode (const int32_t error_code)
 

Private Member Functions

void initializeParallelismHints ()
 
ExecutionResult executeRelAlgQueryNoRetry (const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, const bool explain_verbose, RenderInfo *render_info)
 
void executeRelAlgStep (const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
void executeUpdate (const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
 
void executeDelete (const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
 
ExecutionResult executeCompound (const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeAggregate (const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
ExecutionResult executeProject (const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
 
ExecutionResult executeTableFunction (const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
 
ExecutionResult executeFilter (const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeSort (const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeLogicalValues (const RelLogicalValues *, const ExecutionOptions &)
 
ExecutionResult executeModify (const RelModify *modify, const ExecutionOptions &eo)
 
ExecutionResult executeUnion (const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
WorkUnit createSortInputWorkUnit (const RelSort *, std::list< Analyzer::OrderEntry > &order_entries, const ExecutionOptions &eo)
 
ExecutionResult executeWorkUnit (const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
 
void computeWindow (const WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
 
std::unique_ptr
< WindowFunctionContext
createWindowFunctionContext (const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, std::unordered_map< QueryPlanHash, std::shared_ptr< HashJoin >> &partition_cache, std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, const WorkUnit &work_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
 
size_t getNDVEstimation (const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
 
bool hasDeletedRowInQuery (std::vector< InputTableInfo > const &) const
 
std::optional< size_t > getFilteredCountAll (const RelAlgExecutionUnit &ra_exe_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
 
FilterSelectivity getFilterSelectivity (const std::vector< std::shared_ptr< Analyzer::Expr >> &filter_expressions, const CompilationOptions &co, const ExecutionOptions &eo)
 
std::vector< PushedDownFilterInfoselectFiltersToBePushedDown (const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
 
bool isRowidLookup (const WorkUnit &work_unit)
 
ExecutionResult handleOutOfMemoryRetry (const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
 
WorkUnit createWorkUnit (const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createCompoundWorkUnit (const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createAggregateWorkUnit (const RelAggregate *, const SortInfo &, const bool just_explain)
 
WorkUnit createProjectWorkUnit (const RelProject *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createFilterWorkUnit (const RelFilter *, const SortInfo &, const bool just_explain)
 
WorkUnit createJoinWorkUnit (const RelJoin *, const SortInfo &, const bool just_explain)
 
WorkUnit createUnionWorkUnit (const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
 
TableFunctionWorkUnit createTableFunctionWorkUnit (const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
 
void addTemporaryTable (const int table_id, const ResultSetPtr &result)
 
void eraseFromTemporaryTables (const int table_id)
 
void handleNop (RaExecutionDesc &ed)
 
std::unordered_map< unsigned,
JoinQualsPerNestingLevel > & 
getLeftDeepJoinTreesInfo ()
 
JoinQualsPerNestingLevel translateLeftDeepJoinFilter (const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
 
std::list< std::shared_ptr
< Analyzer::Expr > > 
makeJoinQuals (const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
 
void setHasStepForUnion (bool flag)
 
bool hasStepForUnion () const
 
bool canUseResultsetCache (const ExecutionOptions &eo, RenderInfo *render_info) const
 
void setupCaching (const RelAlgNode *ra)
 
- Private Member Functions inherited from StorageIOFacility
 StorageIOFacility (Executor *executor)
 
StorageIOFacility::UpdateCallback yieldUpdateCallback (UpdateTransactionParameters &update_parameters)
 
StorageIOFacility::UpdateCallback yieldDeleteCallback (DeleteTransactionParameters &delete_parameters)
 

Static Private Member Functions

static void handlePersistentError (const int32_t error_code)
 

Private Attributes

Executorexecutor_
 
std::unique_ptr< RelAlgDagquery_dag_
 
std::shared_ptr< const
query_state::QueryState
query_state_
 
TemporaryTables temporary_tables_
 
time_t now_
 
std::unordered_map< unsigned,
JoinQualsPerNestingLevel
left_deep_join_info_
 
std::vector< std::shared_ptr
< Analyzer::Expr > > 
target_exprs_owned_
 
std::unordered_map< unsigned,
AggregatedResult
leaf_results_
 
int64_t queue_time_ms_
 
bool has_step_for_union_
 
std::unique_ptr
< TransactionParameters
dml_transaction_parameters_
 
std::optional< std::function
< void()> > 
post_execution_callback_
 

Static Private Attributes

static SpeculativeTopNBlacklist speculative_topn_blacklist_
 

Friends

class PendingExecutionClosure
 

Additional Inherited Members

- Private Types inherited from StorageIOFacility
using UpdateCallback = UpdateLogForFragment::Callback
 
using TableDescriptorType = TableDescriptor
 
using DeleteVictimOffsetList = std::vector< uint64_t >
 
using UpdateTargetOffsetList = std::vector< uint64_t >
 
using UpdateTargetTypeList = std::vector< TargetMetaInfo >
 
using UpdateTargetColumnNamesList = std::vector< std::string >
 
using TransactionLog = Fragmenter_Namespace::InsertOrderFragmenter::ModifyTransactionTracker
 
using TransactionLogPtr = std::unique_ptr< TransactionLog >
 
using ColumnValidationFunction = std::function< bool(std::string const &)>
 

Detailed Description

Definition at line 52 of file RelAlgExecutor.h.

Member Typedef Documentation

Definition at line 54 of file RelAlgExecutor.h.

Constructor & Destructor Documentation

RelAlgExecutor::RelAlgExecutor ( Executor executor,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 56 of file RelAlgExecutor.h.

References initializeParallelismHints().

58  : StorageIOFacility(executor)
59  , executor_(executor)
60  , query_state_(std::move(query_state))
61  , now_(0)
62  , queue_time_ms_(0) {
64  }
int64_t queue_time_ms_
std::shared_ptr< const query_state::QueryState > query_state_
void initializeParallelismHints()
StorageIOFacility(Executor *executor)
Executor * executor_

+ Here is the call graph for this function:

RelAlgExecutor::RelAlgExecutor ( Executor executor,
const std::string &  query_ra,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 66 of file RelAlgExecutor.h.

References initializeParallelismHints().

69  : StorageIOFacility(executor)
70  , executor_(executor)
71  , query_dag_(RelAlgDagBuilder::buildDag(query_ra, true))
72  , query_state_(std::move(query_state))
73  , now_(0)
74  , queue_time_ms_(0) {
76  }
int64_t queue_time_ms_
static std::unique_ptr< RelAlgDag > buildDag(const std::string &query_ra, const bool optimize_dag)
Definition: RelAlgDag.cpp:3335
std::unique_ptr< RelAlgDag > query_dag_
std::shared_ptr< const query_state::QueryState > query_state_
void initializeParallelismHints()
StorageIOFacility(Executor *executor)
Executor * executor_

+ Here is the call graph for this function:

RelAlgExecutor::RelAlgExecutor ( Executor executor,
std::unique_ptr< RelAlgDag query_dag,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 78 of file RelAlgExecutor.h.

References initializeParallelismHints().

81  : StorageIOFacility(executor)
82  , executor_(executor)
83  , query_dag_(std::move(query_dag))
84  , query_state_(std::move(query_state))
85  , now_(0)
86  , queue_time_ms_(0) {
88  }
int64_t queue_time_ms_
std::unique_ptr< RelAlgDag > query_dag_
std::shared_ptr< const query_state::QueryState > query_state_
void initializeParallelismHints()
StorageIOFacility(Executor *executor)
Executor * executor_

+ Here is the call graph for this function:

Member Function Documentation

void RelAlgExecutor::addLeafResult ( const unsigned  id,
const AggregatedResult result 
)
inline

Definition at line 129 of file RelAlgExecutor.h.

References CHECK, and leaf_results_.

129  {
130  const auto it_ok = leaf_results_.emplace(id, result);
131  CHECK(it_ok.second);
132  }
std::unordered_map< unsigned, AggregatedResult > leaf_results_
#define CHECK(condition)
Definition: Logger.h:291
void RelAlgExecutor::addTemporaryTable ( const int  table_id,
const ResultSetPtr result 
)
inlineprivate

Definition at line 400 of file RelAlgExecutor.h.

References CHECK, CHECK_LT, and temporary_tables_.

Referenced by executeRelAlgSeq(), executeRelAlgStep(), executeSort(), executeTableFunction(), and handleNop().

400  {
401  CHECK_LT(size_t(0), result->colCount());
402  CHECK_LT(table_id, 0);
403  auto it_ok = temporary_tables_.emplace(table_id, result);
404  CHECK(it_ok.second);
405  }
TemporaryTables temporary_tables_
#define CHECK_LT(x, y)
Definition: Logger.h:303
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the caller graph for this function:

bool RelAlgExecutor::canUseResultsetCache ( const ExecutionOptions eo,
RenderInfo render_info 
) const
private

Definition at line 482 of file RelAlgExecutor.cpp.

References g_cluster, g_enable_data_recycler, g_use_query_resultset_cache, hasStepForUnion(), is_validate_or_explain_query(), heavyai::InSituFlagsOwnerInterface::isInSitu(), and ExecutionOptions::outer_fragment_indices.

Referenced by executeRelAlgStep(), executeSort(), executeTableFunction(), and executeWorkUnit().

483  {
484  auto validate_or_explain_query = is_validate_or_explain_query(eo);
485  auto query_for_partial_outer_frag = !eo.outer_fragment_indices.empty();
487  !validate_or_explain_query && !hasStepForUnion() &&
488  !query_for_partial_outer_frag &&
489  (!render_info || (render_info && !render_info->isInSitu()));
490 }
bool g_use_query_resultset_cache
Definition: Execute.cpp:156
bool is_validate_or_explain_query(const ExecutionOptions &eo)
std::vector< size_t > outer_fragment_indices
bool g_enable_data_recycler
Definition: Execute.cpp:154
bool hasStepForUnion() const
bool g_cluster

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::cleanupPostExecution ( )

Definition at line 775 of file RelAlgExecutor.cpp.

References CHECK, and executor_.

Referenced by executeRelAlgQueryNoRetry(), and getOuterFragmentCount().

775  {
776  CHECK(executor_);
777  executor_->row_set_mem_owner_ = nullptr;
778 }
#define CHECK(condition)
Definition: Logger.h:291
Executor * executor_

+ Here is the caller graph for this function:

AggregatedColRange RelAlgExecutor::computeColRangesCache ( )

Definition at line 755 of file RelAlgExecutor.cpp.

References executor_, anonymous_namespace{RelAlgExecutor.cpp}::get_physical_inputs_with_spi_col_id(), and getRootRelAlgNode().

755  {
756  AggregatedColRange agg_col_range_cache;
757  const auto phys_inputs = get_physical_inputs_with_spi_col_id(&getRootRelAlgNode());
758  return executor_->computeColRangesCache(phys_inputs);
759 }
const RelAlgNode & getRootRelAlgNode() const
std::unordered_set< PhysicalInput > get_physical_inputs_with_spi_col_id(const RelAlgNode *ra)
Executor * executor_

+ Here is the call graph for this function:

StringDictionaryGenerations RelAlgExecutor::computeStringDictionaryGenerations ( )

Definition at line 761 of file RelAlgExecutor.cpp.

References executor_, anonymous_namespace{RelAlgExecutor.cpp}::get_physical_inputs_with_spi_col_id(), and getRootRelAlgNode().

761  {
762  const auto phys_inputs = get_physical_inputs_with_spi_col_id(&getRootRelAlgNode());
763  return executor_->computeStringDictionaryGenerations(phys_inputs);
764 }
const RelAlgNode & getRootRelAlgNode() const
std::unordered_set< PhysicalInput > get_physical_inputs_with_spi_col_id(const RelAlgNode *ra)
Executor * executor_

+ Here is the call graph for this function:

TableGenerations RelAlgExecutor::computeTableGenerations ( )

Definition at line 766 of file RelAlgExecutor.cpp.

References executor_, get_physical_table_inputs(), and getRootRelAlgNode().

766  {
767  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
768  return executor_->computeTableGenerations(phys_table_ids);
769 }
std::unordered_set< shared::TableKey > get_physical_table_inputs(const RelAlgNode *ra)
const RelAlgNode & getRootRelAlgNode() const
Executor * executor_

+ Here is the call graph for this function:

void RelAlgExecutor::computeWindow ( const WorkUnit work_unit,
const CompilationOptions co,
const ExecutionOptions eo,
ColumnCacheMap column_cache_map,
const int64_t  queue_time_ms 
)
private

Definition at line 2483 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, WindowProjectNodeContext::create(), createWindowFunctionContext(), RelAlgExecutor::WorkUnit::exe_unit, executor_, ExecutionOptions::executor_type, Extern, get_table_infos(), Analyzer::WindowFunction::getPartitionKeys(), RelAlgExecutionUnit::input_descs, kBOOLEAN, kBW_EQ, kONE, RelAlgExecutionUnit::target_exprs, and anonymous_namespace{RelAlgExecutor.cpp}::transform_to_inner().

Referenced by executeUpdate(), and executeWorkUnit().

2487  {
2488  auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
2489  CHECK_EQ(query_infos.size(), size_t(1));
2490  if (query_infos.front().info.fragments.size() != 1) {
2491  throw std::runtime_error(
2492  "Only single fragment tables supported for window functions for now");
2493  }
2494  if (eo.executor_type == ::ExecutorType::Extern) {
2495  return;
2496  }
2497  query_infos.push_back(query_infos.front());
2498  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
2499  // a query may hold multiple window functions having the same partition by condition
2500  // then after building the first hash partition we can reuse it for the rest of
2501  // the window functions
2502  // here, a cached partition can be shared via multiple window function contexts as is
2503  // but sorted partition should be copied to reuse since we use it for (intermediate)
2504  // output buffer
2505  // todo (yoonmin) : support recycler for window function computation?
2506  std::unordered_map<QueryPlanHash, std::shared_ptr<HashJoin>> partition_cache;
2507  std::unordered_map<QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>
2508  sorted_partition_cache;
2509  std::unordered_map<QueryPlanHash, size_t> sorted_partition_key_ref_count_map;
2510  std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>>
2511  window_function_context_map;
2512  std::unordered_map<QueryPlanHash, AggregateTreeForWindowFraming> aggregate_tree_map;
2513  for (size_t target_index = 0; target_index < work_unit.exe_unit.target_exprs.size();
2514  ++target_index) {
2515  const auto& target_expr = work_unit.exe_unit.target_exprs[target_index];
2516  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
2517  if (!window_func) {
2518  continue;
2519  }
2520  // Always use baseline layout hash tables for now, make the expression a tuple.
2521  const auto& partition_keys = window_func->getPartitionKeys();
2522  std::shared_ptr<Analyzer::BinOper> partition_key_cond;
2523  if (partition_keys.size() >= 1) {
2524  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
2525  if (partition_keys.size() > 1) {
2526  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
2527  } else {
2528  CHECK_EQ(partition_keys.size(), size_t(1));
2529  partition_key_tuple = partition_keys.front();
2530  }
2531  // Creates a tautology equality with the partition expression on both sides.
2532  partition_key_cond =
2533  makeExpr<Analyzer::BinOper>(kBOOLEAN,
2534  kBW_EQ,
2535  kONE,
2536  partition_key_tuple,
2537  transform_to_inner(partition_key_tuple.get()));
2538  }
2539  auto context =
2540  createWindowFunctionContext(window_func,
2541  partition_key_cond /*nullptr if no partition key*/,
2542  partition_cache,
2543  sorted_partition_key_ref_count_map,
2544  work_unit,
2545  query_infos,
2546  co,
2547  column_cache_map,
2548  executor_->getRowSetMemoryOwner());
2549  CHECK(window_function_context_map.emplace(target_index, std::move(context)).second);
2550  }
2551 
2552  for (auto& kv : window_function_context_map) {
2553  kv.second->compute(
2554  sorted_partition_key_ref_count_map, sorted_partition_cache, aggregate_tree_map);
2555  window_project_node_context->addWindowFunctionContext(std::move(kv.second), kv.first);
2556  }
2557 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
static WindowProjectNodeContext * create(Executor *executor)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, std::unordered_map< QueryPlanHash, std::shared_ptr< HashJoin >> &partition_cache, std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, const WorkUnit &work_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
ExecutorType executor_type
Definition: sqldefs.h:71
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqldefs.h:30
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:2798
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createAggregateWorkUnit ( const RelAggregate aggregate,
const SortInfo sort_info,
const bool  just_explain 
)
private

Definition at line 4873 of file RelAlgExecutor.cpp.

References QueryPlanDagExtractor::applyLimitClauseToCacheKey(), CHECK_EQ, RegisteredQueryHint::defaults(), executor_, QueryPlanDagExtractor::extractJoinInfo(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), getLeftDeepJoinTreesInfo(), RelAlgNode::getOutputMetainfo(), RelAlgNode::getQueryPlanDagHash(), RelAlgNode::inputCount(), now_, query_dag_, query_state_, RelAlgNode::setOutputMetainfo(), anonymous_namespace{RelAlgExecutor.cpp}::synthesize_inputs(), target_exprs_owned_, anonymous_namespace{RelAlgExecutor.cpp}::translate_groupby_exprs(), and anonymous_namespace{RelAlgExecutor.cpp}::translate_targets().

Referenced by createWorkUnit(), and executeAggregate().

4876  {
4877  std::vector<InputDescriptor> input_descs;
4878  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4879  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4880  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
4881  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4882  get_input_desc(aggregate, input_to_nest_level, {});
4883  const auto join_type = get_join_type(aggregate);
4884 
4885  RelAlgTranslator translator(
4886  query_state_, executor_, input_to_nest_level, {join_type}, now_, just_explain);
4887  CHECK_EQ(size_t(1), aggregate->inputCount());
4888  const auto source = aggregate->getInput(0);
4889  const auto& in_metainfo = source->getOutputMetainfo();
4890  const auto scalar_sources =
4891  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
4892  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
4893  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4894  const auto target_exprs = translate_targets(target_exprs_owned_,
4895  target_exprs_type_infos,
4896  scalar_sources,
4897  groupby_exprs,
4898  aggregate,
4899  translator);
4900 
4901  const auto query_infos = get_table_infos(input_descs, executor_);
4902 
4903  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
4904  aggregate->setOutputMetainfo(targets_meta);
4905  auto query_hint = RegisteredQueryHint::defaults();
4906  if (query_dag_) {
4907  auto candidate = query_dag_->getQueryHint(aggregate);
4908  if (candidate) {
4909  query_hint = *candidate;
4910  }
4911  }
4912  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4913  aggregate, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
4914  return {RelAlgExecutionUnit{input_descs,
4915  input_col_descs,
4916  {},
4917  {},
4918  {},
4919  groupby_exprs,
4920  target_exprs,
4921  target_exprs_type_infos,
4922  nullptr,
4923  sort_info,
4924  0,
4925  query_hint,
4927  aggregate->getQueryPlanDagHash(), sort_info),
4928  join_info.hash_table_plan_dag,
4929  join_info.table_id_to_node_map,
4930  false,
4931  std::nullopt,
4932  query_state_},
4933  aggregate,
4935  nullptr};
4936 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:863
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, std::unordered_map< size_t, SQLTypeInfo > &target_exprs_type_infos, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:364
std::shared_ptr< const query_state::QueryState > query_state_
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:114
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
const size_t inputCount() const
Definition: RelAlgDag.h:875
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:865
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createCompoundWorkUnit ( const RelCompound compound,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4537 of file RelAlgExecutor.cpp.

References QueryPlanDagExtractor::applyLimitClauseToCacheKey(), CHECK_EQ, convert_bbox_intersect_join(), RegisteredQueryHint::defaults(), anonymous_namespace{RelAlgExecutor.cpp}::do_table_reordering(), executor_, ExecutionOptions::executor_type, QueryPlanDagExtractor::extractJoinInfo(), g_default_max_groups_buffer_entry_guess, g_from_table_reordering, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_left_deep_join_input_sizes(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), getLeftDeepJoinTreesInfo(), RelAlgNode::getQueryPlanDagHash(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), RelAlgNode::inputCount(), ExecutionOptions::just_explain, LEFT, anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), now_, shared::printContainer(), query_dag_, query_state_, anonymous_namespace{RelAlgExecutor.cpp}::rewrite_quals(), RelAlgNode::setOutputMetainfo(), RelAlgExecutionUnit::simple_quals, RelCompound::size(), target_exprs_owned_, anonymous_namespace{RelAlgExecutor.cpp}::translate_groupby_exprs(), anonymous_namespace{RelAlgExecutor.cpp}::translate_quals(), anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), anonymous_namespace{RelAlgExecutor.cpp}::translate_targets(), translateLeftDeepJoinFilter(), and VLOG.

Referenced by createWorkUnit(), executeCompound(), executeDelete(), executeUpdate(), and getOuterFragmentCount().

4540  {
4541  std::vector<InputDescriptor> input_descs;
4542  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4543  auto input_to_nest_level = get_input_nest_levels(compound, {});
4544  std::tie(input_descs, input_col_descs, std::ignore) =
4545  get_input_desc(compound, input_to_nest_level, {});
4546  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4547  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4548  auto query_infos = get_table_infos(input_descs, executor_);
4549  CHECK_EQ(size_t(1), compound->inputCount());
4550  const auto left_deep_join =
4551  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
4552  JoinQualsPerNestingLevel left_deep_join_quals;
4553  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4554  : std::vector<JoinType>{get_join_type(compound)};
4555  std::vector<size_t> input_permutation;
4556  std::vector<size_t> left_deep_join_input_sizes;
4557  std::optional<unsigned> left_deep_tree_id;
4558  if (left_deep_join) {
4559  left_deep_tree_id = left_deep_join->getId();
4560  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4561  left_deep_join_quals = translateLeftDeepJoinFilter(
4562  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4564  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
4565  join_types.end()) {
4566  input_permutation = do_table_reordering(input_descs,
4567  input_col_descs,
4568  left_deep_join_quals,
4569  input_to_nest_level,
4570  compound,
4571  query_infos,
4572  executor_);
4573  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
4574  std::tie(input_descs, input_col_descs, std::ignore) =
4575  get_input_desc(compound, input_to_nest_level, input_permutation);
4576  left_deep_join_quals = translateLeftDeepJoinFilter(
4577  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4578  }
4579  }
4580  auto const bbox_intersect_qual_info = convert_bbox_intersect_join(left_deep_join_quals,
4581  input_descs,
4582  input_to_nest_level,
4583  input_permutation,
4584  input_col_descs,
4585  executor_);
4586  if (bbox_intersect_qual_info.is_reordered) {
4587  query_infos = get_table_infos(input_descs, executor_);
4588  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4589  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4590  }
4591  if (bbox_intersect_qual_info.has_bbox_intersect_join) {
4592  left_deep_join_quals = bbox_intersect_qual_info.join_quals;
4593  }
4594  RelAlgTranslator translator(
4595  query_state_, executor_, input_to_nest_level, join_types, now_, eo.just_explain);
4596  const auto quals_cf = translate_quals(compound, translator);
4597  const auto quals = rewrite_quals(quals_cf.quals);
4598  const auto scalar_sources =
4599  translate_scalar_sources(compound, translator, eo.executor_type);
4600  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
4601  std::unordered_map<size_t, SQLTypeInfo> target_exprs_type_infos;
4602  const auto target_exprs = translate_targets(target_exprs_owned_,
4603  target_exprs_type_infos,
4604  scalar_sources,
4605  groupby_exprs,
4606  compound,
4607  translator,
4608  eo.executor_type);
4609 
4610  auto query_hint = RegisteredQueryHint::defaults();
4611  if (query_dag_) {
4612  auto candidate = query_dag_->getQueryHint(compound);
4613  if (candidate) {
4614  query_hint = *candidate;
4615  }
4616  }
4617  CHECK_EQ(compound->size(), target_exprs.size());
4618  const RelAlgExecutionUnit exe_unit = {input_descs,
4619  input_col_descs,
4620  quals_cf.simple_quals,
4621  quals,
4622  left_deep_join_quals,
4623  groupby_exprs,
4624  target_exprs,
4625  target_exprs_type_infos,
4626  nullptr,
4627  sort_info,
4628  0,
4629  query_hint,
4631  compound->getQueryPlanDagHash(), sort_info),
4632  {},
4633  {},
4634  false,
4635  std::nullopt,
4636  query_state_};
4637  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4638  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4639  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
4640  compound->setOutputMetainfo(targets_meta);
4641  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
4642  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
4643  left_deep_trees_info.emplace(left_deep_tree_id.value(),
4644  rewritten_exe_unit.join_quals);
4645  }
4646  if (has_valid_query_plan_dag(compound)) {
4647  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
4648  compound, left_deep_tree_id, left_deep_trees_info, executor_);
4649  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
4650  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
4651  }
4652  return {rewritten_exe_unit,
4653  compound,
4655  std::move(query_rewriter),
4656  input_permutation,
4657  left_deep_join_input_sizes};
4658 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:174
bool has_valid_query_plan_dag(const RelAlgNode *node)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
size_t size() const override
Definition: RelAlgDag.h:2094
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
std::vector< JoinCondition > JoinQualsPerNestingLevel
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:863
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
bool g_from_table_reordering
Definition: Execute.cpp:93
ExecutorType executor_type
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, std::unordered_map< size_t, SQLTypeInfo > &target_exprs_type_infos, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
JoinType get_join_type(const RelAlgNode *ra)
BoundingBoxIntersectJoinTranslationInfo convert_bbox_intersect_join(JoinQualsPerNestingLevel const &join_quals, std::vector< InputDescriptor > &input_descs, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, std::vector< size_t > &input_permutation, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_desc, Executor const *executor)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:364
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:114
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
QualsConjunctiveForm translate_quals(const RelCompound *compound, const RelAlgTranslator &translator)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:107
const size_t inputCount() const
Definition: RelAlgDag.h:875
Executor * executor_
#define VLOG(n)
Definition: Logger.h:388
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)
std::list< std::shared_ptr< Analyzer::Expr > > rewrite_quals(const std::list< std::shared_ptr< Analyzer::Expr >> &quals)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createFilterWorkUnit ( const RelFilter filter,
const SortInfo sort_info,
const bool  just_explain 
)
private

Definition at line 5424 of file RelAlgExecutor.cpp.

References QueryPlanDagExtractor::applyLimitClauseToCacheKey(), CHECK_EQ, RegisteredQueryHint::defaults(), executor_, QueryPlanDagExtractor::extractJoinInfo(), fold_expr(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_inputs_meta(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), RelFilter::getCondition(), getLeftDeepJoinTreesInfo(), RelAlgNode::getQueryPlanDagHash(), RelAlgNode::inputCount(), now_, query_dag_, query_state_, rewrite_expr(), RelAlgNode::setOutputMetainfo(), and target_exprs_owned_.

Referenced by createWorkUnit(), and executeFilter().

5426  {
5427  CHECK_EQ(size_t(1), filter->inputCount());
5428  std::vector<InputDescriptor> input_descs;
5429  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5430  std::vector<TargetMetaInfo> in_metainfo;
5431  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
5432  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
5433 
5434  const auto input_to_nest_level = get_input_nest_levels(filter, {});
5435  std::tie(input_descs, input_col_descs, used_inputs_owned) =
5436  get_input_desc(filter, input_to_nest_level, {});
5437  const auto join_type = get_join_type(filter);
5438  RelAlgTranslator translator(
5439  query_state_, executor_, input_to_nest_level, {join_type}, now_, just_explain);
5440  std::tie(in_metainfo, target_exprs_owned) =
5441  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
5442  const auto filter_expr = translator.translate(filter->getCondition());
5443  const auto query_infos = get_table_infos(input_descs, executor_);
5444 
5445  const auto qual = fold_expr(filter_expr.get());
5446  target_exprs_owned_.insert(
5447  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
5448 
5449  const auto target_exprs = get_raw_pointers(target_exprs_owned);
5450  filter->setOutputMetainfo(in_metainfo);
5451  const auto rewritten_qual = rewrite_expr(qual.get());
5452  auto query_hint = RegisteredQueryHint::defaults();
5453  if (query_dag_) {
5454  auto candidate = query_dag_->getQueryHint(filter);
5455  if (candidate) {
5456  query_hint = *candidate;
5457  }
5458  }
5459  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
5460  filter, std::nullopt, getLeftDeepJoinTreesInfo(), executor_);
5461  return {{input_descs,
5462  input_col_descs,
5463  {},
5464  {rewritten_qual ? rewritten_qual : qual},
5465  {},
5466  {nullptr},
5467  target_exprs,
5468  {},
5469  nullptr,
5470  sort_info,
5471  0,
5472  query_hint,
5474  filter->getQueryPlanDagHash(), sort_info),
5475  join_info.hash_table_plan_dag,
5476  join_info.table_id_to_node_map},
5477  filter,
5479  nullptr};
5480 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
const RexScalar * getCondition() const
Definition: RelAlgDag.h:1898
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:863
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
JoinType get_join_type(const RelAlgNode *ra)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:364
std::shared_ptr< const query_state::QueryState > query_state_
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:114
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
const size_t inputCount() const
Definition: RelAlgDag.h:875
Executor * executor_
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

WorkUnit RelAlgExecutor::createJoinWorkUnit ( const RelJoin ,
const SortInfo ,
const bool  just_explain 
)
private
RelAlgExecutor::WorkUnit RelAlgExecutor::createProjectWorkUnit ( const RelProject project,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4938 of file RelAlgExecutor.cpp.

References QueryPlanDagExtractor::applyLimitClauseToCacheKey(), convert_bbox_intersect_join(), RegisteredQueryHint::defaults(), anonymous_namespace{RelAlgExecutor.cpp}::do_table_reordering(), executor_, ExecutionOptions::executor_type, QueryPlanDagExtractor::extractJoinInfo(), g_default_max_groups_buffer_entry_guess, g_from_table_reordering, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_left_deep_join_input_sizes(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), getLeftDeepJoinTreesInfo(), RelAlgNode::getQueryPlanDagHash(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), ExecutionOptions::just_explain, anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), now_, shared::printContainer(), query_dag_, query_state_, RelAlgNode::setOutputMetainfo(), target_exprs_owned_, anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), translateLeftDeepJoinFilter(), and VLOG.

Referenced by createWorkUnit(), executeDelete(), executeProject(), executeUpdate(), and getOuterFragmentCount().

4941  {
4942  std::vector<InputDescriptor> input_descs;
4943  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4944  auto input_to_nest_level = get_input_nest_levels(project, {});
4945  std::tie(input_descs, input_col_descs, std::ignore) =
4946  get_input_desc(project, input_to_nest_level, {});
4947  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4948  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4949  auto query_infos = get_table_infos(input_descs, executor_);
4950  const auto left_deep_join =
4951  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
4952  JoinQualsPerNestingLevel left_deep_join_quals;
4953  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4954  : std::vector<JoinType>{get_join_type(project)};
4955  std::vector<size_t> input_permutation;
4956  std::vector<size_t> left_deep_join_input_sizes;
4957  std::optional<unsigned> left_deep_tree_id;
4958  if (left_deep_join) {
4959  left_deep_tree_id = left_deep_join->getId();
4960  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
4961  left_deep_join_quals = translateLeftDeepJoinFilter(
4962  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4964  input_permutation = do_table_reordering(input_descs,
4965  input_col_descs,
4966  left_deep_join_quals,
4967  input_to_nest_level,
4968  project,
4969  query_infos,
4970  executor_);
4971  input_to_nest_level = get_input_nest_levels(project, input_permutation);
4972  std::tie(input_descs, input_col_descs, std::ignore) =
4973  get_input_desc(project, input_to_nest_level, input_permutation);
4974  left_deep_join_quals = translateLeftDeepJoinFilter(
4975  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
4976  }
4977  }
4978  auto const bbox_intersect_qual_info = convert_bbox_intersect_join(left_deep_join_quals,
4979  input_descs,
4980  input_to_nest_level,
4981  input_permutation,
4982  input_col_descs,
4983  executor_);
4984  if (bbox_intersect_qual_info.is_reordered) {
4985  query_infos = get_table_infos(input_descs, executor_);
4986  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
4987  VLOG(3) << "input_col_descs=" << shared::printContainer(input_col_descs);
4988  }
4989  if (bbox_intersect_qual_info.has_bbox_intersect_join) {
4990  left_deep_join_quals = bbox_intersect_qual_info.join_quals;
4991  }
4992  RelAlgTranslator translator(
4993  query_state_, executor_, input_to_nest_level, join_types, now_, eo.just_explain);
4994  const auto target_exprs_owned =
4995  translate_scalar_sources(project, translator, eo.executor_type);
4996 
4997  target_exprs_owned_.insert(
4998  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4999  const auto target_exprs = get_raw_pointers(target_exprs_owned);
5000  auto query_hint = RegisteredQueryHint::defaults();
5001  if (query_dag_) {
5002  auto candidate = query_dag_->getQueryHint(project);
5003  if (candidate) {
5004  query_hint = *candidate;
5005  }
5006  }
5007  const RelAlgExecutionUnit exe_unit = {input_descs,
5008  input_col_descs,
5009  {},
5010  {},
5011  left_deep_join_quals,
5012  {nullptr},
5013  target_exprs,
5014  {},
5015  nullptr,
5016  sort_info,
5017  0,
5018  query_hint,
5020  project->getQueryPlanDagHash(), sort_info),
5021  {},
5022  {},
5023  false,
5024  std::nullopt,
5025  query_state_};
5026  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
5027  auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
5028  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
5029  project->setOutputMetainfo(targets_meta);
5030  auto& left_deep_trees_info = getLeftDeepJoinTreesInfo();
5031  if (left_deep_tree_id && left_deep_tree_id.has_value()) {
5032  left_deep_trees_info.emplace(left_deep_tree_id.value(),
5033  rewritten_exe_unit.join_quals);
5034  }
5035  if (has_valid_query_plan_dag(project)) {
5036  auto join_info = QueryPlanDagExtractor::extractJoinInfo(
5037  project, left_deep_tree_id, left_deep_trees_info, executor_);
5038  rewritten_exe_unit.hash_table_build_plan_dag = join_info.hash_table_plan_dag;
5039  rewritten_exe_unit.table_id_to_node_map = join_info.table_id_to_node_map;
5040  }
5041  return {rewritten_exe_unit,
5042  project,
5044  std::move(query_rewriter),
5045  input_permutation,
5046  left_deep_join_input_sizes};
5047 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:174
bool has_valid_query_plan_dag(const RelAlgNode *node)
static ExtractedJoinInfo extractJoinInfo(const RelAlgNode *top_node, std::optional< unsigned > left_deep_tree_id, std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_tree_infos, Executor *executor)
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
std::vector< JoinCondition > JoinQualsPerNestingLevel
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:863
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
bool g_from_table_reordering
Definition: Execute.cpp:93
ExecutorType executor_type
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
JoinType get_join_type(const RelAlgNode *ra)
BoundingBoxIntersectJoinTranslationInfo convert_bbox_intersect_join(JoinQualsPerNestingLevel const &join_quals, std::vector< InputDescriptor > &input_descs, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, std::vector< size_t > &input_permutation, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_desc, Executor const *executor)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
std::unique_ptr< RelAlgDag > query_dag_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:364
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:114
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:107
Executor * executor_
#define VLOG(n)
Definition: Logger.h:388
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createSortInputWorkUnit ( const RelSort sort,
std::list< Analyzer::OrderEntry > &  order_entries,
const ExecutionOptions eo 
)
private

Definition at line 3387 of file RelAlgExecutor.cpp.

References CHECK_GT, RelSort::collationCount(), SpeculativeTopNBlacklist::contains(), createWorkUnit(), Default, anonymous_namespace{RelAlgExecutor.cpp}::first_oe_is_desc(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_scan_limit(), get_target_info(), RelAlgNode::getInput(), RelSort::getLimit(), RelSort::getOffset(), RelAlgExecutionUnit::input_descs, RelAlgNode::setOutputMetainfo(), speculative_topn_blacklist_, SpeculativeTopN, and StreamingTopN.

Referenced by executeRelAlgQuerySingleStep(), and executeSort().

3390  {
3391  const auto source = sort->getInput(0);
3392  auto limit = sort->getLimit();
3393  const size_t offset = sort->getOffset();
3394  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
3395  const size_t scan_total_limit =
3396  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
3397  size_t max_groups_buffer_entry_guess{
3398  scan_total_limit ? scan_total_limit : g_default_max_groups_buffer_entry_guess};
3400  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
3401  auto source_work_unit = createWorkUnit(source, sort_info, eo);
3402  const auto& source_exe_unit = source_work_unit.exe_unit;
3403 
3404  // we do not allow sorting geometry or array types
3405  for (auto order_entry : order_entries) {
3406  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
3407  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
3408  const auto& ti = get_target_info(te, false);
3409  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
3410  throw std::runtime_error(
3411  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
3412  }
3413  }
3414 
3415  if (source_exe_unit.groupby_exprs.size() == 1) {
3416  if (!source_exe_unit.groupby_exprs.front()) {
3417  sort_algorithm = SortAlgorithm::StreamingTopN;
3418  } else {
3419  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
3420  first_oe_is_desc(order_entries))) {
3421  sort_algorithm = SortAlgorithm::Default;
3422  }
3423  }
3424  }
3425 
3426  sort->setOutputMetainfo(source->getOutputMetainfo());
3427  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
3428  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
3429  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
3430  std::move(source_exe_unit.input_col_descs),
3431  source_exe_unit.simple_quals,
3432  source_exe_unit.quals,
3433  source_exe_unit.join_quals,
3434  source_exe_unit.groupby_exprs,
3435  source_exe_unit.target_exprs,
3436  source_exe_unit.target_exprs_original_type_infos,
3437  nullptr,
3438  {sort_info.order_entries, sort_algorithm, limit, offset},
3439  scan_total_limit,
3440  source_exe_unit.query_hint,
3441  source_exe_unit.query_plan_dag_hash,
3442  source_exe_unit.hash_table_build_plan_dag,
3443  source_exe_unit.table_id_to_node_map,
3444  source_exe_unit.use_bump_allocator,
3445  source_exe_unit.union_all,
3446  source_exe_unit.query_state},
3447  source,
3448  max_groups_buffer_entry_guess,
3449  std::move(source_work_unit.query_rewriter),
3450  source_work_unit.input_permutation,
3451  source_work_unit.left_deep_join_input_sizes};
3452 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
size_t getOffset() const
Definition: RelAlgDag.h:2228
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
static SpeculativeTopNBlacklist speculative_topn_blacklist_
std::vector< InputDescriptor > input_descs
#define CHECK_GT(x, y)
Definition: Logger.h:305
TargetInfo get_target_info(const Analyzer::Expr *target_expr, const bool bigint_count)
Definition: TargetInfo.h:92
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
size_t get_scan_limit(const RelAlgNode *ra, std::optional< size_t > limit)
size_t collationCount() const
Definition: RelAlgDag.h:2211
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:114
std::optional< size_t > getLimit() const
Definition: RelAlgDag.h:2226
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::TableFunctionWorkUnit RelAlgExecutor::createTableFunctionWorkUnit ( const RelTableFunction table_func,
const bool  just_explain,
const bool  is_gpu 
)
private

Definition at line 5180 of file RelAlgExecutor.cpp.

References bind_table_function(), CHECK, CHECK_EQ, CHECK_GT, CHECK_LT, RelTableFunction::countRexLiteralArgs(), DEFAULT_ROW_MULTIPLIER_VALUE, executor_, ext_arg_type_to_type_info(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelTableFunction::getColInputsSize(), RelTableFunction::getFunctionName(), RelTableFunction::getTableFuncInputAt(), IS_GEO, kENCODING_ARRAY, kENCODING_NONE, kINT, shared::StringDictKey::kTransientDictKey, LOG, now_, query_state_, RelAlgNode::setOutputMetainfo(), TableFunctions, TableFunctionExecutionUnit::target_exprs, target_exprs_owned_, to_string(), TRANSIENT_DICT_ID, anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), UNREACHABLE, and logger::WARNING.

Referenced by executeTableFunction().

5183  {
5184  std::vector<InputDescriptor> input_descs;
5185  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5186  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
5187  std::tie(input_descs, input_col_descs, std::ignore) =
5188  get_input_desc(rel_table_func, input_to_nest_level, {});
5189  const auto query_infos = get_table_infos(input_descs, executor_);
5190  RelAlgTranslator translator(
5191  query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
5192  auto input_exprs_owned = translate_scalar_sources(
5193  rel_table_func, translator, ::ExecutorType::TableFunctions);
5194  target_exprs_owned_.insert(
5195  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
5196 
5197  const auto table_function_impl_and_type_infos = [=]() {
5198  if (is_gpu) {
5199  try {
5200  return bind_table_function(
5201  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
5202  } catch (ExtensionFunctionBindingError& e) {
5203  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
5204  << " Redirecting " << rel_table_func->getFunctionName()
5205  << " step to run on CPU.";
5206  throw QueryMustRunOnCpu();
5207  }
5208  } else {
5209  try {
5210  return bind_table_function(
5211  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
5212  } catch (ExtensionFunctionBindingError& e) {
5213  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
5214  throw;
5215  }
5216  }
5217  }();
5218  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
5219  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
5220  size_t output_row_sizing_param = 0;
5221  if (table_function_impl
5222  .hasUserSpecifiedOutputSizeParameter()) { // constant and row multiplier
5223  const auto parameter_index =
5224  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
5225  CHECK_GT(parameter_index, size_t(0));
5226  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
5227  const auto parameter_expr =
5228  rel_table_func->getTableFuncInputAt(parameter_index - 1);
5229  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
5230  if (!parameter_expr_literal) {
5231  throw std::runtime_error(
5232  "Provided output buffer sizing parameter is not a literal. Only literal "
5233  "values are supported with output buffer sizing configured table "
5234  "functions.");
5235  }
5236  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
5237  if (literal_val < 0) {
5238  throw std::runtime_error("Provided output sizing parameter " +
5239  std::to_string(literal_val) +
5240  " must be positive integer.");
5241  }
5242  output_row_sizing_param = static_cast<size_t>(literal_val);
5243  } else {
5244  // RowMultiplier not specified in the SQL query. Set it to 1
5245  output_row_sizing_param = 1; // default value for RowMultiplier
5246  static Datum d = {DEFAULT_ROW_MULTIPLIER_VALUE};
5247  static Analyzer::ExpressionPtr DEFAULT_ROW_MULTIPLIER_EXPR =
5248  makeExpr<Analyzer::Constant>(kINT, false, d);
5249  // Push the constant 1 to input_exprs
5250  input_exprs_owned.insert(input_exprs_owned.begin() + parameter_index - 1,
5251  DEFAULT_ROW_MULTIPLIER_EXPR);
5252  }
5253  } else if (table_function_impl.hasNonUserSpecifiedOutputSize()) {
5254  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
5255  } else {
5256  UNREACHABLE();
5257  }
5258 
5259  std::vector<Analyzer::ColumnVar*> input_col_exprs;
5260  size_t input_index = 0;
5261  size_t arg_index = 0;
5262  const auto table_func_args = table_function_impl.getInputArgs();
5263  CHECK_EQ(table_func_args.size(), table_function_type_infos.size());
5264  for (const auto& ti : table_function_type_infos) {
5265  if (ti.is_column_list()) {
5266  for (int i = 0; i < ti.get_dimension(); i++) {
5267  auto& input_expr = input_exprs_owned[input_index];
5268  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr.get());
5269  CHECK(col_var);
5270 
5271  // avoid setting type info to ti here since ti doesn't have all the
5272  // properties correctly set
5273  auto type_info = input_expr->get_type_info();
5274  if (ti.is_column_array()) {
5275  type_info.set_compression(kENCODING_ARRAY);
5276  type_info.set_subtype(type_info.get_subtype()); // set type to be subtype
5277  } else {
5278  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5279  }
5280  type_info.set_type(ti.get_type()); // set type to column list
5281  type_info.set_dimension(ti.get_dimension());
5282  type_info.setUsesFlatBuffer(type_info.get_elem_type().supportsFlatBuffer());
5283  input_expr->set_type_info(type_info);
5284 
5285  input_col_exprs.push_back(col_var);
5286  input_index++;
5287  }
5288  } else if (ti.is_column()) {
5289  auto& input_expr = input_exprs_owned[input_index];
5290  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr.get());
5291  CHECK(col_var);
5292  // same here! avoid setting type info to ti since it doesn't have all the
5293  // properties correctly set
5294  auto type_info = input_expr->get_type_info();
5295  if (ti.is_column_array()) {
5296  type_info.set_compression(kENCODING_ARRAY);
5297  type_info.set_subtype(type_info.get_subtype()); // set type to be subtype
5298  } else {
5299  type_info.set_subtype(type_info.get_type()); // set type to be subtype
5300  }
5301  type_info.set_type(ti.get_type()); // set type to column
5302  type_info.setUsesFlatBuffer(type_info.get_elem_type().supportsFlatBuffer());
5303  input_expr->set_type_info(type_info);
5304  input_col_exprs.push_back(col_var);
5305  input_index++;
5306  } else {
5307  auto input_expr = input_exprs_owned[input_index];
5308  auto ext_func_arg_ti = ext_arg_type_to_type_info(table_func_args[arg_index]);
5309  if (ext_func_arg_ti != input_expr->get_type_info()) {
5310  input_exprs_owned[input_index] = input_expr->add_cast(ext_func_arg_ti);
5311  }
5312  input_index++;
5313  }
5314  arg_index++;
5315  }
5316  CHECK_EQ(input_col_exprs.size(), rel_table_func->getColInputsSize());
5317  std::vector<Analyzer::Expr*> table_func_outputs;
5318  constexpr int32_t transient_pos{-1};
5319  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
5320  auto ti = table_function_impl.getOutputSQLType(i);
5321  ti.setUsesFlatBuffer(ti.supportsFlatBuffer());
5322  if (ti.is_geometry()) {
5323  auto p = table_function_impl.getInputID(i);
5324  int32_t input_pos = p.first;
5325  if (input_pos != transient_pos) {
5326  CHECK(!ti.is_column_list()); // see QE-472
5327  CHECK_LT(input_pos, input_exprs_owned.size());
5328  const auto& reference_ti = input_exprs_owned[input_pos]->get_type_info();
5329  CHECK(IS_GEO(reference_ti.get_type()) || IS_GEO(reference_ti.get_subtype()));
5330  ti.set_input_srid(reference_ti.get_input_srid());
5331  ti.set_output_srid(reference_ti.get_output_srid());
5332  ti.set_compression(reference_ti.get_compression());
5333  ti.set_comp_param(reference_ti.get_comp_param());
5334  } else {
5335  ti.set_input_srid(0);
5336  ti.set_output_srid(0);
5337  ti.set_compression(kENCODING_NONE);
5338  ti.set_comp_param(0);
5339  }
5340  } else if (ti.is_dict_encoded_string() || ti.is_text_encoding_dict_array()) {
5341  auto p = table_function_impl.getInputID(i);
5342 
5343  int32_t input_pos = p.first;
5344  if (input_pos == transient_pos) {
5345  ti.set_comp_param(TRANSIENT_DICT_ID);
5346  ti.setStringDictKey(shared::StringDictKey::kTransientDictKey);
5347  } else {
5348  // Iterate over the list of arguments to compute the offset. Use this offset to
5349  // get the corresponding input
5350  int32_t offset = 0;
5351  for (int j = 0; j < input_pos; j++) {
5352  const auto ti = table_function_type_infos[j];
5353  offset += ti.is_column_list() ? ti.get_dimension() : 1;
5354  }
5355  input_pos = offset + p.second;
5356 
5357  CHECK_LT(input_pos, input_exprs_owned.size());
5358  const auto& dict_key =
5359  input_exprs_owned[input_pos]->get_type_info().getStringDictKey();
5360  ti.set_comp_param(dict_key.dict_id);
5361  ti.setStringDictKey(dict_key);
5362  }
5363  }
5364  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(
5365  ti, shared::ColumnKey{0, 0, int32_t(i)}, -1));
5366  table_func_outputs.push_back(target_exprs_owned_.back().get());
5367  }
5368  auto input_exprs = get_raw_pointers(input_exprs_owned);
5369  const TableFunctionExecutionUnit exe_unit = {
5370  input_descs,
5371  input_col_descs,
5372  input_exprs, // table function inputs
5373  input_col_exprs, // table function column inputs (duplicates w/ above)
5374  table_func_outputs, // table function projected exprs
5375  output_row_sizing_param, // output buffer sizing param
5376  table_function_impl};
5377  const auto targets_meta = get_targets_meta(rel_table_func, exe_unit.target_exprs);
5378  rel_table_func->setOutputMetainfo(targets_meta);
5379  return {exe_unit, rel_table_func};
5380 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
#define LOG(tag)
Definition: Logger.h:285
#define UNREACHABLE()
Definition: Logger.h:338
std::shared_ptr< Analyzer::Expr > ExpressionPtr
Definition: Analyzer.h:184
#define TRANSIENT_DICT_ID
Definition: DbObjectKeys.h:24
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string to_string(char const *&&v)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
#define CHECK_LT(x, y)
Definition: Logger.h:303
#define DEFAULT_ROW_MULTIPLIER_VALUE
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
const std::tuple< table_functions::TableFunction, std::vector< SQLTypeInfo > > bind_table_function(std::string name, Analyzer::ExpressionPtrVector input_args, const std::vector< table_functions::TableFunction > &table_funcs, const bool is_gpu)
static const StringDictKey kTransientDictKey
Definition: DbObjectKeys.h:45
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqltypes.h:72
std::vector< Analyzer::Expr * > target_exprs
Definition: Datum.h:69
Executor * executor_
#define IS_GEO(T)
Definition: sqltypes.h:310
SQLTypeInfo ext_arg_type_to_type_info(const ExtArgumentType ext_arg_type)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createUnionWorkUnit ( const RelLogicalUnion logical_union,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 5073 of file RelAlgExecutor.cpp.

References gpu_enabled::accumulate(), shared::append_move(), CHECK, RelRexToStringConfig::defaults(), RegisteredQueryHint::defaults(), EMPTY_HASHED_PLAN_DAG_KEY, executor_, g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_raw_pointers(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelLogicalUnion::isAll(), shared::printContainer(), query_state_, RelAlgNode::setOutputMetainfo(), anonymous_namespace{RelAlgExecutor.cpp}::target_exprs_for_union(), target_exprs_owned_, RelAlgNode::toString(), and VLOG.

Referenced by executeUnion().

5076  {
5077  std::vector<InputDescriptor> input_descs;
5078  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
5079  // Map ra input ptr to index (0, 1).
5080  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
5081  std::tie(input_descs, input_col_descs, std::ignore) =
5082  get_input_desc(logical_union, input_to_nest_level, {});
5083  const auto query_infos = get_table_infos(input_descs, executor_);
5084  auto const max_num_tuples =
5085  std::accumulate(query_infos.cbegin(),
5086  query_infos.cend(),
5087  size_t(0),
5088  [](auto max, auto const& query_info) {
5089  return std::max(max, query_info.info.getNumTuples());
5090  });
5091 
5092  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
5093  for (auto& pair : input_to_nest_level) {
5094  VLOG(3) << " (" << pair.first->toString(RelRexToStringConfig::defaults()) << ", "
5095  << pair.second << ')';
5096  }
5097 
5098  // For UNION queries, we need to keep the target_exprs from both subqueries since they
5099  // may differ on StringDictionaries.
5100  std::vector<Analyzer::Expr*> target_exprs_pair[2];
5101  for (unsigned i = 0; i < 2; ++i) {
5102  auto input_exprs_owned = target_exprs_for_union(logical_union->getInput(i));
5103  CHECK(!input_exprs_owned.empty())
5104  << "No metainfo found for input node(" << i << ") "
5105  << logical_union->getInput(i)->toString(RelRexToStringConfig::defaults());
5106  VLOG(3) << "i(" << i << ") input_exprs_owned.size()=" << input_exprs_owned.size();
5107  for (auto& input_expr : input_exprs_owned) {
5108  VLOG(3) << " " << input_expr->toString();
5109  }
5110  target_exprs_pair[i] = get_raw_pointers(input_exprs_owned);
5111  shared::append_move(target_exprs_owned_, std::move(input_exprs_owned));
5112  }
5113 
5114  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
5115  << " input_col_descs=" << shared::printContainer(input_col_descs)
5116  << " target_exprs.size()=" << target_exprs_pair[0].size()
5117  << " max_num_tuples=" << max_num_tuples;
5118 
5119  const RelAlgExecutionUnit exe_unit = {input_descs,
5120  input_col_descs,
5121  {}, // quals_cf.simple_quals,
5122  {}, // rewrite_quals(quals_cf.quals),
5123  {},
5124  {nullptr},
5125  target_exprs_pair[0],
5126  {},
5127  nullptr,
5128  sort_info,
5129  max_num_tuples,
5132  {},
5133  {},
5134  false,
5135  logical_union->isAll(),
5136  query_state_,
5137  target_exprs_pair[1]};
5138  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
5139  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
5140 
5141  RelAlgNode const* input0 = logical_union->getInput(0);
5142  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
5143  logical_union->setOutputMetainfo(
5144  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5145  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
5146  logical_union->setOutputMetainfo(
5147  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5148  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
5149  logical_union->setOutputMetainfo(
5150  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5151  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
5152  logical_union->setOutputMetainfo(
5153  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5154  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
5155  logical_union->setOutputMetainfo(
5156  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5157  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
5158  logical_union->setOutputMetainfo(
5159  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5160  } else if (auto const* node = dynamic_cast<const RelLogicalValues*>(input0)) {
5161  logical_union->setOutputMetainfo(
5162  get_targets_meta(node, rewritten_exe_unit.target_exprs));
5163  } else if (dynamic_cast<const RelSort*>(input0)) {
5164  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
5165  } else {
5166  throw QueryNotSupported("Unsupported input type: " +
5168  }
5169  VLOG(3) << "logical_union->getOutputMetainfo()="
5170  << shared::printContainer(logical_union->getOutputMetainfo())
5171  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey()="
5172  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableKey();
5173 
5174  return {rewritten_exe_unit,
5175  logical_union,
5177  std::move(query_rewriter)};
5178 }
bool isAll() const
Definition: RelAlgDag.h:2770
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
std::vector< Analyzer::Expr * > get_raw_pointers(std::vector< std::shared_ptr< Analyzer::Expr >> const &input)
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation)
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
size_t append_move(std::vector< T > &destination, std::vector< T > &&source)
Definition: misc.h:77
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_for_union(RelAlgNode const *input_node)
virtual std::string toString(RelRexToStringConfig config=RelRexToStringConfig::defaults()) const =0
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
static RegisteredQueryHint defaults()
Definition: QueryHint.h:364
std::shared_ptr< const query_state::QueryState > query_state_
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:78
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:114
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:107
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:865
Executor * executor_
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::unique_ptr< WindowFunctionContext > RelAlgExecutor::createWindowFunctionContext ( const Analyzer::WindowFunction window_func,
const std::shared_ptr< Analyzer::BinOper > &  partition_key_cond,
std::unordered_map< QueryPlanHash, std::shared_ptr< HashJoin >> &  partition_cache,
std::unordered_map< QueryPlanHash, size_t > &  sorted_partition_key_ref_count_map,
const WorkUnit work_unit,
const std::vector< InputTableInfo > &  query_infos,
const CompilationOptions co,
ColumnCacheMap column_cache_map,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner 
)
private

Definition at line 2559 of file RelAlgExecutor.cpp.

References RegisteredQueryHint::aggregate_tree_fanout, QueryPlanDagExtractor::applyLimitClauseToCacheKey(), RelAlgExecutor::WorkUnit::body, CHECK, CHECK_EQ, Data_Namespace::CPU_LEVEL, CompilationOptions::device_type, RelAlgExecutor::WorkUnit::exe_unit, executor_, g_window_function_aggregation_tree_fanout, Analyzer::WindowFunction::getArgs(), Analyzer::WindowFunction::getCollation(), ColumnFetcher::getOneColumnFragment(), Analyzer::WindowFunction::getOrderKeys(), RelAlgNode::getQueryPlanDagHash(), GPU, Data_Namespace::GPU_LEVEL, RelAlgExecutionUnit::hash_table_build_plan_dag, hash_value(), Analyzer::WindowFunction::isFrameNavigateWindowFunction(), OneToMany, RelAlgExecutionUnit::query_hint, RelAlgExecutionUnit::sort_info, RelAlgExecutionUnit::table_id_to_node_map, VLOG, WINDOW_FUNCTION, and WINDOW_FUNCTION_FRAMING.

Referenced by computeWindow().

2568  {
2569  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
2570  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
2573  std::unique_ptr<WindowFunctionContext> context;
2574  auto partition_cache_key = QueryPlanDagExtractor::applyLimitClauseToCacheKey(
2575  work_unit.body->getQueryPlanDagHash(), work_unit.exe_unit.sort_info);
2576  JoinType window_partition_type = window_func->isFrameNavigateWindowFunction()
2579  if (partition_key_cond) {
2580  auto partition_cond_str = partition_key_cond->toString();
2581  auto partition_key_hash = boost::hash_value(partition_cond_str);
2582  boost::hash_combine(partition_cache_key, partition_key_hash);
2583  boost::hash_combine(partition_cache_key, static_cast<int>(window_partition_type));
2584  std::shared_ptr<HashJoin> partition_ptr;
2585  auto cached_hash_table_it = partition_cache.find(partition_cache_key);
2586  if (cached_hash_table_it != partition_cache.end()) {
2587  partition_ptr = cached_hash_table_it->second;
2588  VLOG(1) << "Reuse a hash table to compute window function context (key: "
2589  << partition_cache_key << ", partition condition: " << partition_cond_str
2590  << ")";
2591  } else {
2592  const auto hash_table_or_err = executor_->buildHashTableForQualifier(
2593  partition_key_cond,
2594  query_infos,
2595  memory_level,
2596  window_partition_type,
2598  column_cache_map,
2599  work_unit.exe_unit.hash_table_build_plan_dag,
2600  work_unit.exe_unit.query_hint,
2601  work_unit.exe_unit.table_id_to_node_map);
2602  if (!hash_table_or_err.fail_reason.empty()) {
2603  throw std::runtime_error(hash_table_or_err.fail_reason);
2604  }
2605  CHECK(hash_table_or_err.hash_table->getHashType() == HashType::OneToMany);
2606  partition_ptr = hash_table_or_err.hash_table;
2607  CHECK(partition_cache.insert(std::make_pair(partition_cache_key, partition_ptr))
2608  .second);
2609  VLOG(1) << "Put a generated hash table for computing window function context to "
2610  "cache (key: "
2611  << partition_cache_key << ", partition condition: " << partition_cond_str
2612  << ")";
2613  }
2614  CHECK(partition_ptr);
2615  auto aggregate_tree_fanout = g_window_function_aggregation_tree_fanout;
2616  if (work_unit.exe_unit.query_hint.aggregate_tree_fanout != aggregate_tree_fanout) {
2617  aggregate_tree_fanout = work_unit.exe_unit.query_hint.aggregate_tree_fanout;
2618  VLOG(1) << "Aggregate tree's fanout is set to " << aggregate_tree_fanout;
2619  }
2620  context = std::make_unique<WindowFunctionContext>(window_func,
2621  partition_cache_key,
2622  partition_ptr,
2623  elem_count,
2624  co.device_type,
2625  row_set_mem_owner,
2626  aggregate_tree_fanout);
2627  } else {
2628  context = std::make_unique<WindowFunctionContext>(
2629  window_func, elem_count, co.device_type, row_set_mem_owner);
2630  }
2631  const auto& order_keys = window_func->getOrderKeys();
2632  if (!order_keys.empty()) {
2633  auto sorted_partition_cache_key = partition_cache_key;
2634  for (auto& order_key : order_keys) {
2635  boost::hash_combine(sorted_partition_cache_key, order_key->toString());
2636  }
2637  for (auto& collation : window_func->getCollation()) {
2638  boost::hash_combine(sorted_partition_cache_key, collation.toString());
2639  }
2640  context->setSortedPartitionCacheKey(sorted_partition_cache_key);
2641  auto cache_key_cnt_it =
2642  sorted_partition_key_ref_count_map.try_emplace(sorted_partition_cache_key, 1);
2643  if (!cache_key_cnt_it.second) {
2644  sorted_partition_key_ref_count_map[sorted_partition_cache_key] =
2645  cache_key_cnt_it.first->second + 1;
2646  }
2647 
2648  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2649  for (const auto& order_key : order_keys) {
2650  const auto order_col =
2651  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
2652  if (!order_col) {
2653  throw std::runtime_error("Only order by columns supported for now");
2654  }
2655  auto const [column, col_elem_count] =
2657  *order_col,
2658  query_infos.front().info.fragments.front(),
2659  memory_level,
2660  0,
2661  nullptr,
2662  /*thread_idx=*/0,
2663  chunks_owner,
2664  column_cache_map);
2665 
2666  CHECK_EQ(col_elem_count, elem_count);
2667  context->addOrderColumn(column, order_col->get_type_info(), chunks_owner);
2668  }
2669  }
2670  if (context->getWindowFunction()->hasFraming() ||
2671  context->getWindowFunction()->isMissingValueFillingFunction()) {
2672  // todo (yoonmin) : if we try to support generic window function expression without
2673  // extra project node, we need to revisit here b/c the current logic assumes that
2674  // window function expression has a single input source
2675  auto& window_function_expression_args = window_func->getArgs();
2676  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
2677  for (auto& expr : window_function_expression_args) {
2678  if (const auto arg_col_var =
2679  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(expr)) {
2680  auto const [column, col_elem_count] = ColumnFetcher::getOneColumnFragment(
2681  executor_,
2682  *arg_col_var,
2683  query_infos.front().info.fragments.front(),
2684  memory_level,
2685  0,
2686  nullptr,
2687  /*thread_idx=*/0,
2688  chunks_owner,
2689  column_cache_map);
2690  CHECK_EQ(col_elem_count, elem_count);
2691  context->addColumnBufferForWindowFunctionExpression(column, chunks_owner);
2692  }
2693  }
2694  }
2695  return context;
2696 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
JoinType
Definition: sqldefs.h:174
bool isFrameNavigateWindowFunction() const
Definition: Analyzer.h:2848
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:2802
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:2820
size_t g_window_function_aggregation_tree_fanout
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:2796
ExecutorDeviceType device_type
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
Definition: RelAlgDag.cpp:3525
#define CHECK(condition)
Definition: Logger.h:291
Executor * executor_
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createWorkUnit ( const RelAlgNode node,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 4356 of file RelAlgExecutor.cpp.

References createAggregateWorkUnit(), createCompoundWorkUnit(), createFilterWorkUnit(), createProjectWorkUnit(), RelRexToStringConfig::defaults(), logger::FATAL, ExecutionOptions::just_explain, LOG, and RelAlgNode::toString().

Referenced by createSortInputWorkUnit(), and getJoinInfo().

4358  {
4359  const auto compound = dynamic_cast<const RelCompound*>(node);
4360  if (compound) {
4361  return createCompoundWorkUnit(compound, sort_info, eo);
4362  }
4363  const auto project = dynamic_cast<const RelProject*>(node);
4364  if (project) {
4365  return createProjectWorkUnit(project, sort_info, eo);
4366  }
4367  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
4368  if (aggregate) {
4369  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
4370  }
4371  const auto filter = dynamic_cast<const RelFilter*>(node);
4372  if (filter) {
4373  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
4374  }
4375  LOG(FATAL) << "Unhandled node type: "
4377  return {};
4378 }
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
#define LOG(tag)
Definition: Logger.h:285
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
virtual std::string toString(RelRexToStringConfig config=RelRexToStringConfig::defaults()) const =0
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:78
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::eraseFromTemporaryTables ( const int  table_id)
inlineprivate

Definition at line 407 of file RelAlgExecutor.h.

References temporary_tables_.

407 { temporary_tables_.erase(table_id); }
TemporaryTables temporary_tables_
ExecutionResult RelAlgExecutor::executeAggregate ( const RelAggregate aggregate,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2296 of file RelAlgExecutor.cpp.

References createAggregateWorkUnit(), DEBUG_TIMER, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

2300  {
2301  auto timer = DEBUG_TIMER(__func__);
2302  const auto work_unit = createAggregateWorkUnit(aggregate, SortInfo(), eo.just_explain);
2303  return executeWorkUnit(work_unit,
2304  aggregate->getOutputMetainfo(),
2305  true,
2306  co,
2307  eo,
2308  render_info,
2309  queue_time_ms);
2310 }
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define DEBUG_TIMER(name)
Definition: Logger.h:412
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:865

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeCompound ( const RelCompound compound,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2279 of file RelAlgExecutor.cpp.

References createCompoundWorkUnit(), DEBUG_TIMER, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and RelCompound::isAggregate().

Referenced by executeRelAlgStep().

2283  {
2284  auto timer = DEBUG_TIMER(__func__);
2285  const auto work_unit = createCompoundWorkUnit(compound, SortInfo(), eo);
2286  CompilationOptions co_compound = co;
2287  return executeWorkUnit(work_unit,
2288  compound->getOutputMetainfo(),
2289  compound->isAggregate(),
2290  co_compound,
2291  eo,
2292  render_info,
2293  queue_time_ms);
2294 }
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
bool isAggregate() const
Definition: RelAlgDag.h:2123
#define DEBUG_TIMER(name)
Definition: Logger.h:412
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:865

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeDelete ( const RelAlgNode node,
const CompilationOptions co,
const ExecutionOptions eo_in,
const int64_t  queue_time_ms 
)
private

Definition at line 2174 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, Executor::clearExternalCaches(), createCompoundWorkUnit(), createProjectWorkUnit(), DEBUG_TIMER, RelRexToStringConfig::defaults(), dml_transaction_parameters_, executor_, CompilationOptions::filter_on_deleted_column, get_table_infos(), get_temporary_table(), QueryExecutionError::getErrorCode(), getErrorMessageFromCode(), CompilationOptions::makeCpuOnly(), post_execution_callback_, table_is_temporary(), temporary_tables_, and StorageIOFacility::yieldDeleteCallback().

Referenced by executeRelAlgStep().

2177  {
2178  CHECK(node);
2179  auto timer = DEBUG_TIMER(__func__);
2180 
2181  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
2182  auto& work_unit,
2183  const bool is_aggregate) {
2184  auto* table_descriptor = node->getModifiedTableDescriptor();
2185  CHECK(table_descriptor);
2186  if (!table_descriptor->hasDeletedCol) {
2187  throw std::runtime_error(
2188  "DELETE queries are only supported on tables with the vacuum attribute set to "
2189  "'delayed'");
2190  }
2191 
2192  const auto catalog = node->getModifiedTableCatalog();
2193  CHECK(catalog);
2194  Executor::clearExternalCaches(false, table_descriptor, catalog->getDatabaseId());
2195 
2196  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2197 
2198  auto execute_delete_ra_exe_unit =
2199  [this, &table_infos, &table_descriptor, &eo_in, &co, catalog](
2200  const auto& exe_unit, const bool is_aggregate) {
2202  std::make_unique<DeleteTransactionParameters>(table_descriptor, *catalog);
2203  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
2205  CHECK(delete_params);
2206  auto delete_callback = yieldDeleteCallback(*delete_params);
2208 
2209  auto eo = eo_in;
2210  if (dml_transaction_parameters_->tableIsTemporary()) {
2211  eo.output_columnar_hint = true;
2212  co_delete.filter_on_deleted_column =
2213  false; // project the entire delete column for columnar update
2214  } else {
2215  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
2216  }
2217 
2218  try {
2219  auto table_update_metadata =
2220  executor_->executeUpdate(exe_unit,
2221  table_infos,
2222  table_descriptor,
2223  co_delete,
2224  eo,
2225  *catalog,
2226  executor_->row_set_mem_owner_,
2227  delete_callback,
2228  is_aggregate);
2229  post_execution_callback_ = [table_update_metadata, this, catalog]() {
2230  dml_transaction_parameters_->finalizeTransaction(*catalog);
2231  TableOptimizer table_optimizer{
2232  dml_transaction_parameters_->getTableDescriptor(), executor_, *catalog};
2233  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2234  };
2235  } catch (const QueryExecutionError& e) {
2236  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2237  }
2238  };
2239 
2240  if (table_is_temporary(table_descriptor)) {
2241  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2242  const auto cd = catalog->getDeletedColumn(table_descriptor);
2243  CHECK(cd);
2244  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
2245  cd->columnType,
2247  catalog->getDatabaseId(), table_descriptor->tableId, cd->columnId},
2248  0);
2249  const auto rewritten_exe_unit =
2250  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
2251  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2252  } else {
2253  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2254  }
2255  };
2256 
2257  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2258  const auto work_unit = createCompoundWorkUnit(compound, SortInfo(), eo_in);
2259  execute_delete_for_node(compound, work_unit, compound->isAggregate());
2260  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2261  auto work_unit = createProjectWorkUnit(project, SortInfo(), eo_in);
2262  if (project->isSimple()) {
2263  CHECK_EQ(size_t(1), project->inputCount());
2264  const auto input_ra = project->getInput(0);
2265  if (dynamic_cast<const RelSort*>(input_ra)) {
2266  const auto& input_table =
2267  get_temporary_table(&temporary_tables_, -input_ra->getId());
2268  CHECK(input_table);
2269  work_unit.exe_unit.scan_limit = input_table->rowCount();
2270  }
2271  }
2272  execute_delete_for_node(project, work_unit, false);
2273  } else {
2274  throw std::runtime_error("Unsupported parent node for delete: " +
2275  node->toString(RelRexToStringConfig::defaults()));
2276  }
2277 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::optional< std::function< void()> > post_execution_callback_
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
TemporaryTables temporary_tables_
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:246
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool table_is_temporary(const TableDescriptor *const td)
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:78
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:412
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:438
static std::string getErrorMessageFromCode(const int32_t error_code)
std::unique_ptr< TransactionParameters > dml_transaction_parameters_
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeFilter ( const RelFilter filter,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2698 of file RelAlgExecutor.cpp.

References createFilterWorkUnit(), DEBUG_TIMER, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

2702  {
2703  auto timer = DEBUG_TIMER(__func__);
2704  const auto work_unit = createFilterWorkUnit(filter, SortInfo(), eo.just_explain);
2705  return executeWorkUnit(
2706  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
2707 }
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define DEBUG_TIMER(name)
Definition: Logger.h:412
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:865
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeLogicalValues ( const RelLogicalValues logical_values,
const ExecutionOptions eo 
)
private

Definition at line 2751 of file RelAlgExecutor.cpp.

References CPU, DEBUG_TIMER, executor_, RelLogicalValues::getNumRows(), RelLogicalValues::getTupleType(), kBIGINT, kCOUNT, kNULLT, Projection, query_mem_desc, and RelAlgNode::setOutputMetainfo().

Referenced by executeRelAlgStep().

2753  {
2754  auto timer = DEBUG_TIMER(__func__);
2757 
2758  auto tuple_type = logical_values->getTupleType();
2759  for (size_t i = 0; i < tuple_type.size(); ++i) {
2760  auto& target_meta_info = tuple_type[i];
2761  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2762  // replace w/ bigint
2763  tuple_type[i] =
2764  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2765  }
2766  query_mem_desc.addColSlotInfo(
2767  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2768  }
2769  logical_values->setOutputMetainfo(tuple_type);
2770 
2771  std::vector<TargetInfo> target_infos;
2772  for (const auto& tuple_type_component : tuple_type) {
2773  target_infos.emplace_back(TargetInfo{false,
2774  kCOUNT,
2775  tuple_type_component.get_type_info(),
2776  SQLTypeInfo(kNULLT, false),
2777  false,
2778  false,
2779  /*is_varlen_projection=*/false});
2780  }
2781 
2782  std::shared_ptr<ResultSet> rs{
2783  ResultSetLogicalValuesBuilder{logical_values,
2784  target_infos,
2787  executor_->getRowSetMemoryOwner(),
2788  executor_}
2789  .build()};
2790 
2791  return {rs, tuple_type};
2792 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
size_t getNumRows() const
Definition: RelAlgDag.h:2720
const std::vector< TargetMetaInfo > getTupleType() const
Definition: RelAlgDag.h:2687
Definition: sqldefs.h:78
#define DEBUG_TIMER(name)
Definition: Logger.h:412
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeModify ( const RelModify modify,
const ExecutionOptions eo 
)
private

Definition at line 2842 of file RelAlgExecutor.cpp.

References CPU, DEBUG_TIMER, executor_, and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

2843  {
2844  auto timer = DEBUG_TIMER(__func__);
2845  if (eo.just_explain) {
2846  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2847  }
2848 
2849  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2852  executor_->getRowSetMemoryOwner(),
2853  executor_->blockSize(),
2854  executor_->gridSize());
2855 
2856  std::vector<TargetMetaInfo> empty_targets;
2857  return {rs, empty_targets};
2858 }
std::vector< TargetInfo > TargetInfoList
#define DEBUG_TIMER(name)
Definition: Logger.h:412
Executor * executor_

+ Here is the caller graph for this function:

void RelAlgExecutor::executePostExecutionCallback ( )

Definition at line 4349 of file RelAlgExecutor.cpp.

References post_execution_callback_, and VLOG.

4349  {
4351  VLOG(1) << "Running post execution callback.";
4352  (*post_execution_callback_)();
4353  }
4354 }
std::optional< std::function< void()> > post_execution_callback_
#define VLOG(n)
Definition: Logger.h:388
ExecutionResult RelAlgExecutor::executeProject ( const RelProject project,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const std::optional< size_t >  previous_count 
)
private

Definition at line 2325 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, CPU, createProjectWorkUnit(), DEBUG_TIMER, CompilationOptions::device_type, executeWorkUnit(), get_temporary_table(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelAlgNode::inputCount(), RelProject::isSimple(), and temporary_tables_.

Referenced by executeRelAlgStep().

2331  {
2332  auto timer = DEBUG_TIMER(__func__);
2333  auto work_unit = createProjectWorkUnit(project, SortInfo(), eo);
2334  CompilationOptions co_project = co;
2335  if (project->isSimple()) {
2336  CHECK_EQ(size_t(1), project->inputCount());
2337  const auto input_ra = project->getInput(0);
2338  if (dynamic_cast<const RelSort*>(input_ra)) {
2339  co_project.device_type = ExecutorDeviceType::CPU;
2340  const auto& input_table =
2341  get_temporary_table(&temporary_tables_, -input_ra->getId());
2342  CHECK(input_table);
2343  work_unit.exe_unit.scan_limit =
2344  std::min(input_table->getLimit(), input_table->rowCount());
2345  }
2346  }
2347  return executeWorkUnit(work_unit,
2348  project->getOutputMetainfo(),
2349  false,
2350  co_project,
2351  eo,
2352  render_info,
2353  queue_time_ms,
2354  previous_count);
2355 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
TemporaryTables temporary_tables_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:246
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
bool isSimple() const
Definition: RelAlgDag.h:1307
ExecutorDeviceType device_type
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
const size_t inputCount() const
Definition: RelAlgDag.h:875
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:865

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQuery ( const CompilationOptions co,
const ExecutionOptions eo,
const bool  just_explain_plan,
const bool  explain_verbose,
RenderInfo render_info 
)

Definition at line 563 of file RelAlgExecutor.cpp.

References CHECK, DEBUG_TIMER, executeRelAlgQueryNoRetry(), RenderInfo::forceNonInSitu(), g_allow_cpu_retry, logger::INFO, INJECT_TIMER, RelAlgDag::kBuiltOptimized, LOG, CompilationOptions::makeCpuOnly(), post_execution_callback_, query_dag_, run_benchmark::run_query(), and VLOG.

567  {
568  CHECK(query_dag_);
570  << static_cast<int>(query_dag_->getBuildState());
571 
572  auto timer = DEBUG_TIMER(__func__);
574 
575  auto run_query = [&](const CompilationOptions& co_in) {
576  auto execution_result = executeRelAlgQueryNoRetry(
577  co_in, eo, just_explain_plan, explain_verbose, render_info);
578 
579  constexpr bool vlog_result_set_summary{false};
580  if constexpr (vlog_result_set_summary) {
581  VLOG(1) << execution_result.getRows()->summaryToString();
582  }
583 
585  VLOG(1) << "Running post execution callback.";
586  (*post_execution_callback_)();
587  }
588  return execution_result;
589  };
590 
591  try {
592  return run_query(co);
593  } catch (const QueryMustRunOnCpu&) {
594  if (!g_allow_cpu_retry) {
595  throw;
596  }
597  }
598  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
599  auto co_cpu = CompilationOptions::makeCpuOnly(co);
600 
601  if (render_info) {
602  render_info->forceNonInSitu();
603  }
604  return run_query(co_cpu);
605 }
std::optional< std::function< void()> > post_execution_callback_
void forceNonInSitu()
Definition: RenderInfo.cpp:46
#define LOG(tag)
Definition: Logger.h:285
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:96
std::unique_ptr< RelAlgDag > query_dag_
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, const bool explain_verbose, RenderInfo *render_info)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
bool g_allow_cpu_retry
Definition: Execute.cpp:89
ExecutionResult executeRelAlgQuery(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, const bool explain_verbose, RenderInfo *render_info)
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQueryNoRetry ( const CompilationOptions co,
const ExecutionOptions eo,
const bool  just_explain_plan,
const bool  explain_verbose,
RenderInfo render_info 
)
private

Definition at line 607 of file RelAlgExecutor.cpp.

References ExecutionOptions::allow_runtime_query_interrupt, CHECK, cleanupPostExecution(), DEBUG_TIMER, Executor::ERR_INTERRUPTED, executeRelAlgQueryWithFilterPushDown(), executeRelAlgSeq(), executor_, ExecutionOptions::find_push_down_candidates, g_enable_dynamic_watchdog, QueryExecutionError::getErrorCode(), getGlobalQueryHint(), RelAlgDag::getNodes(), getParsedQueryHint(), getRelAlgDag(), getRootRelAlgNode(), getSubqueries(), RelAlgDagViewer::handleQueryEngineVector(), INJECT_TIMER, ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, anonymous_namespace{RelAlgExecutor.cpp}::prepare_for_system_table_execution(), anonymous_namespace{RelAlgExecutor.cpp}::prepare_foreign_table_for_execution(), query_dag_, query_state_, run_benchmark_import::result, RelAlgDag::setGlobalQueryHints(), setupCaching(), timer_start(), and timer_stop().

Referenced by executeRelAlgQuery().

611  {
613  auto timer = DEBUG_TIMER(__func__);
614  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
615 
616  query_dag_->resetQueryExecutionState();
617  const auto& ra = query_dag_->getRootNode();
618 
619  // capture the lock acquistion time
620  auto clock_begin = timer_start();
622  executor_->resetInterrupt();
623  }
624  std::string query_session{""};
625  std::string query_str{"N/A"};
626  std::string query_submitted_time{""};
627  // gather necessary query's info
628  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
629  query_session = query_state_->getConstSessionInfo()->get_session_id();
630  query_str = query_state_->getQueryStr();
631  query_submitted_time = query_state_->getQuerySubmittedTime();
632  }
633 
634  auto validate_or_explain_query =
635  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
636  auto interruptable = !render_info && !query_session.empty() &&
637  eo.allow_runtime_query_interrupt && !validate_or_explain_query;
638  if (interruptable) {
639  // if we reach here, the current query which was waiting an idle executor
640  // within the dispatch queue is now scheduled to the specific executor
641  // (not UNITARY_EXECUTOR)
642  // so we update the query session's status with the executor that takes this query
643  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
644  query_session, query_str, query_submitted_time);
645 
646  // now the query is going to be executed, so update the status as
647  // "RUNNING_QUERY_KERNEL"
648  executor_->updateQuerySessionStatus(
649  query_session,
650  query_submitted_time,
651  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL);
652  }
653 
654  // so it should do cleanup session info after finishing its execution
655  ScopeGuard clearQuerySessionInfo =
656  [this, &query_session, &interruptable, &query_submitted_time] {
657  // reset the runtime query interrupt status after the end of query execution
658  if (interruptable) {
659  // cleanup running session's info
660  executor_->clearQuerySessionStatus(query_session, query_submitted_time);
661  }
662  };
663 
664  auto acquire_execute_mutex = [](Executor * executor) -> auto{
665  auto ret = executor->acquireExecuteMutex();
666  return ret;
667  };
668  // now we acquire executor lock in here to make sure that this executor holds
669  // all necessary resources and at the same time protect them against other executor
670  auto lock = acquire_execute_mutex(executor_);
671 
672  if (interruptable) {
673  // check whether this query session is "already" interrupted
674  // this case occurs when there is very short gap between being interrupted and
675  // taking the execute lock
676  // if so we have to remove "all" queries initiated by this session and we do in here
677  // without running the query
678  try {
679  executor_->checkPendingQueryStatus(query_session);
680  } catch (QueryExecutionError& e) {
682  throw std::runtime_error("Query execution has been interrupted (pending query)");
683  }
684  throw e;
685  } catch (...) {
686  throw std::runtime_error("Checking pending query status failed: unknown error");
687  }
688  }
689  int64_t queue_time_ms = timer_stop(clock_begin);
690 
692 
693  // Notify foreign tables to load prior to caching
695 
696  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
697  setupCaching(&ra);
698 
699  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
700  auto ed_seq = RaExecutionSequence(&ra, executor_);
701 
702  if (just_explain_plan) {
703  const auto& ra = getRootRelAlgNode();
704  auto ed_seq = RaExecutionSequence(&ra, executor_);
705  std::vector<const RelAlgNode*> steps;
706  for (size_t i = 0; i < ed_seq.size(); i++) {
707  steps.emplace_back(ed_seq.getDescriptor(i)->getBody());
708  steps.back()->setStepNumber(i + 1);
709  }
710  std::stringstream ss;
711  auto& nodes = getRelAlgDag()->getNodes();
712  RelAlgDagViewer dagv(false, explain_verbose);
713  dagv.handleQueryEngineVector(nodes);
714  ss << dagv;
715  auto rs = std::make_shared<ResultSet>(ss.str());
716  return {rs, {}};
717  }
718 
719  if (eo.find_push_down_candidates) {
720  // this extra logic is mainly due to current limitations on multi-step queries
721  // and/or subqueries.
723  ed_seq, co, eo, render_info, queue_time_ms);
724  }
725  timer_setup.stop();
726 
727  // Dispatch the subqueries first
728  const auto global_hints = getGlobalQueryHint();
729  for (auto subquery : getSubqueries()) {
730  const auto subquery_ra = subquery->getRelAlg();
731  CHECK(subquery_ra);
732  if (subquery_ra->hasContextData()) {
733  continue;
734  }
735  // Execute the subquery and cache the result.
736  RelAlgExecutor subquery_executor(executor_, query_state_);
737  // Propagate global and local query hint if necessary
738  const auto local_hints = getParsedQueryHint(subquery_ra);
739  if (global_hints || local_hints) {
740  const auto subquery_rel_alg_dag = subquery_executor.getRelAlgDag();
741  if (global_hints) {
742  subquery_rel_alg_dag->setGlobalQueryHints(*global_hints);
743  }
744  if (local_hints) {
745  subquery_rel_alg_dag->registerQueryHint(subquery_ra, *local_hints);
746  }
747  }
748  RaExecutionSequence subquery_seq(subquery_ra, executor_);
749  auto result = subquery_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
750  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
751  }
752  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
753 }
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1623
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
void setupCaching(const RelAlgNode *ra)
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:81
std::optional< RegisteredQueryHint > getParsedQueryHint(const RelAlgNode *node)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
std::vector< std::shared_ptr< RelAlgNode > > & getNodes()
Definition: RelAlgDag.h:2824
#define INJECT_TIMER(DESC)
Definition: measure.h:96
A container for relational algebra descriptors defining the execution order for a relational algebra ...
std::optional< RegisteredQueryHint > getGlobalQueryHint()
std::unique_ptr< RelAlgDag > query_dag_
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node)
std::shared_ptr< const query_state::QueryState > query_state_
const RelAlgNode & getRootRelAlgNode() const
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, const bool explain_verbose, RenderInfo *render_info)
void prepare_for_system_table_execution(const RelAlgNode &ra_node, const CompilationOptions &co)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
void cleanupPostExecution()
RelAlgDag * getRelAlgDag()
Executor * executor_
Type timer_start()
Definition: measure.h:42

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

QueryStepExecutionResult RelAlgExecutor::executeRelAlgQuerySingleStep ( const RaExecutionSequence seq,
const size_t  step_idx,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info 
)

Definition at line 805 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, anonymous_namespace{RelAlgExecutor.cpp}::check_sort_node_source_constraint(), CPU, createSortInputWorkUnit(), CompilationOptions::device_type, executeRelAlgSubSeq(), RaExecutionSequence::getDescriptor(), GPU, logger::INFO, INJECT_TIMER, ExecutionOptions::just_validate, LOG, anonymous_namespace{RelAlgExecutor.cpp}::node_is_aggregate(), post_execution_callback_, queue_time_ms_, Reduce, GroupByAndAggregate::shard_count_for_top_groups(), gpu_enabled::sort(), Union, and VLOG.

810  {
811  INJECT_TIMER(executeRelAlgQueryStep);
812 
813  auto exe_desc_ptr = seq.getDescriptor(step_idx);
814  CHECK(exe_desc_ptr);
815  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
816 
817  size_t shard_count{0};
818  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
819  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
820  };
821 
822  if (sort) {
824  auto order_entries = sort->getOrderEntries();
825  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
826  shard_count =
827  GroupByAndAggregate::shard_count_for_top_groups(source_work_unit.exe_unit);
828  if (!shard_count) {
829  // No point in sorting on the leaf, only execute the input to the sort node.
830  CHECK_EQ(size_t(1), sort->inputCount());
831  const auto source = sort->getInput(0);
832  if (sort->collationCount() || node_is_aggregate(source)) {
833  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
834  CHECK_EQ(temp_seq.size(), size_t(1));
835  ExecutionOptions eo_copy = eo;
836  eo_copy.just_validate = eo.just_validate || sort->isEmptyResult();
837  // Use subseq to avoid clearing existing temporary tables
838  return {
839  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
840  merge_type(source),
841  source->getId(),
842  false};
843  }
844  }
845  }
846  QueryStepExecutionResult query_step_result{ExecutionResult{},
847  merge_type(exe_desc_ptr->getBody()),
848  exe_desc_ptr->getBody()->getId(),
849  false};
850  try {
851  query_step_result.result = executeRelAlgSubSeq(
852  seq, std::make_pair(step_idx, step_idx + 1), co, eo, render_info, queue_time_ms_);
853  } catch (QueryMustRunOnCpu const& e) {
855  auto copied_co = co;
857  LOG(INFO) << "Retry the query via CPU mode";
858  query_step_result.result = executeRelAlgSubSeq(seq,
859  std::make_pair(step_idx, step_idx + 1),
860  copied_co,
861  eo,
862  render_info,
864  }
866  VLOG(1) << "Running post execution callback.";
867  (*post_execution_callback_)();
868  }
869  return query_step_result;
870 }
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::optional< std::function< void()> > post_execution_callback_
RaExecutionDesc * getDescriptor(size_t idx) const
#define LOG(tag)
Definition: Logger.h:285
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
void check_sort_node_source_constraint(const RelSort *sort)
#define INJECT_TIMER(DESC)
Definition: measure.h:96
MergeType
A container for relational algebra descriptors defining the execution order for a relational algebra ...
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
ExecutorDeviceType device_type
bool node_is_aggregate(const RelAlgNode *ra)
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit)
#define CHECK(condition)
Definition: Logger.h:291
#define VLOG(n)
Definition: Logger.h:388
WorkUnit createSortInputWorkUnit(const RelSort *, std::list< Analyzer::OrderEntry > &order_entries, const ExecutionOptions &eo)

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQueryWithFilterPushDown ( const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)

Definition at line 148 of file JoinFilterPushDown.cpp.

References CHECK, executeRelAlgSeq(), executor_, ExecutionOptions::find_push_down_candidates, getSubqueries(), ExecutionOptions::just_calcite_explain, run_benchmark_import::result, and RaExecutionSequence::size().

Referenced by executeRelAlgQueryNoRetry().

153  {
154  // we currently do not fully support filter push down with
155  // multi-step execution and/or with subqueries
156  // TODO(Saman): add proper caching to enable filter push down for all cases
157  const auto& subqueries = getSubqueries();
158  if (seq.size() > 1 || !subqueries.empty()) {
159  if (eo.just_calcite_explain) {
160  return ExecutionResult(std::vector<PushedDownFilterInfo>{},
162  }
163  auto eo_modified = eo;
164  eo_modified.find_push_down_candidates = false;
165  eo_modified.just_calcite_explain = false;
166 
167  // Dispatch the subqueries first
168  for (auto subquery : subqueries) {
169  // Execute the subquery and cache the result.
170  RelAlgExecutor ra_executor(executor_);
171  const auto subquery_ra = subquery->getRelAlg();
172  CHECK(subquery_ra);
173  RaExecutionSequence subquery_seq(subquery_ra, executor_);
174  auto result =
175  ra_executor.executeRelAlgSeq(subquery_seq, co, eo_modified, nullptr, 0);
176  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
177  }
178  return executeRelAlgSeq(seq, co, eo_modified, render_info, queue_time_ms);
179  }
180  // else
181  return executeRelAlgSeq(seq, co, eo, render_info, queue_time_ms);
182 }
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
A container for relational algebra descriptors defining the execution order for a relational algebra ...
#define CHECK(condition)
Definition: Logger.h:291
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgSeq ( const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const bool  with_existing_temp_tables = false 
)

Definition at line 889 of file RelAlgExecutor.cpp.

References addTemporaryTable(), CHECK, CHECK_GE, DEBUG_TIMER, CompilationOptions::device_type, RaExecutionSequence::empty(), executeRelAlgStep(), executor_, RenderInfo::forceNonInSitu(), g_allow_query_step_cpu_retry, g_allow_query_step_skipping, g_cluster, g_enable_interop, RaExecutionDesc::getBody(), RaExecutionSequence::getDescriptor(), RaExecutionSequence::getSkippedQueryStepCacheKeys(), GPU, RaExecutionSequence::hasQueryStepForUnion(), logger::INFO, INJECT_TIMER, ExecutionOptions::just_explain, ExecutionOptions::keep_result, left_deep_join_info_, LOG, CompilationOptions::makeCpuOnly(), now_, RaExecutionSequence::size(), gpu_enabled::swap(), target_exprs_owned_, temporary_tables_, and VLOG.

Referenced by executeRelAlgQueryNoRetry(), and executeRelAlgQueryWithFilterPushDown().

894  {
896  auto timer = DEBUG_TIMER(__func__);
897  if (!with_existing_temp_tables) {
899  }
902  executor_->temporary_tables_ = &temporary_tables_;
903 
904  time(&now_);
905  CHECK(!seq.empty());
906 
907  auto get_descriptor_count = [&seq, &eo]() -> size_t {
908  if (eo.just_explain) {
909  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
910  // run the logical values descriptor to generate the result set, then the next
911  // descriptor to generate the explain
912  CHECK_GE(seq.size(), size_t(2));
913  return 2;
914  } else {
915  return 1;
916  }
917  } else {
918  return seq.size();
919  }
920  };
921 
922  const auto exec_desc_count = get_descriptor_count();
923  auto eo_copied = eo;
924  if (seq.hasQueryStepForUnion()) {
925  // we currently do not support resultset recycling when an input query
926  // contains union (all) operation
927  eo_copied.keep_result = false;
928  }
929 
930  // we have to register resultset(s) of the skipped query step(s) as temporary table
931  // before executing the remaining query steps
932  // since they may be required during the query processing
933  // i.e., get metadata of the target expression from the skipped query step
935  for (const auto& kv : seq.getSkippedQueryStepCacheKeys()) {
936  const auto cached_res =
937  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(kv.second);
938  CHECK(cached_res);
939  addTemporaryTable(kv.first, cached_res);
940  }
941  }
942 
943  const auto num_steps = exec_desc_count - 1;
944  for (size_t i = 0; i < exec_desc_count; i++) {
945  VLOG(1) << "Executing query step " << i << " / " << num_steps;
946  try {
948  seq, i, co, eo_copied, (i == num_steps) ? render_info : nullptr, queue_time_ms);
949  } catch (const QueryMustRunOnCpu&) {
950  // Do not allow per-step retry if flag is off or in distributed mode
951  // TODO(todd): Determine if and when we can relax this restriction
952  // for distributed
955  throw;
956  }
957  LOG(INFO) << "Retrying current query step " << i << " / " << num_steps << " on CPU";
958  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
959  if (render_info && i == num_steps) {
960  // only render on the last step
961  render_info->forceNonInSitu();
962  }
963  executeRelAlgStep(seq,
964  i,
965  co_cpu,
966  eo_copied,
967  (i == num_steps) ? render_info : nullptr,
968  queue_time_ms);
969  } catch (const NativeExecutionError&) {
970  if (!g_enable_interop) {
971  throw;
972  }
973  auto eo_extern = eo_copied;
974  eo_extern.executor_type = ::ExecutorType::Extern;
975  auto exec_desc_ptr = seq.getDescriptor(i);
976  const auto body = exec_desc_ptr->getBody();
977  const auto compound = dynamic_cast<const RelCompound*>(body);
978  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
979  LOG(INFO) << "Also failed to run the query using interoperability";
980  throw;
981  }
983  seq, i, co, eo_extern, (i == num_steps) ? render_info : nullptr, queue_time_ms);
984  }
985  }
986 
987  return seq.getDescriptor(num_steps)->getResult();
988 }
RaExecutionDesc * getDescriptor(size_t idx) const
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
void forceNonInSitu()
Definition: RenderInfo.cpp:46
const bool hasQueryStepForUnion() const
#define LOG(tag)
Definition: Logger.h:285
bool g_allow_query_step_skipping
Definition: Execute.cpp:159
const std::unordered_map< int, QueryPlanHash > getSkippedQueryStepCacheKeys() const
TemporaryTables temporary_tables_
#define CHECK_GE(x, y)
Definition: Logger.h:306
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
bool g_enable_interop
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:96
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutorDeviceType device_type
std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_join_info_
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
const RelAlgNode * getBody() const
bool g_allow_query_step_cpu_retry
Definition: Execute.cpp:90
bool g_cluster
Executor * executor_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeRelAlgStep ( const RaExecutionSequence seq,
const size_t  step_idx,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 1148 of file RelAlgExecutor.cpp.

References addTemporaryTable(), QueryPlanDagExtractor::applyLimitClauseToCacheKey(), build_render_targets(), canUseResultsetCache(), CHECK, SortInfo::createFromSortNode(), DEBUG_TIMER, RelRexToStringConfig::defaults(), executeAggregate(), executeCompound(), executeDelete(), executeFilter(), executeLogicalValues(), executeModify(), executeProject(), executeSort(), executeTableFunction(), executeUnion(), executeUpdate(), executor_, logger::FATAL, g_cluster, g_skip_intermediate_count, RelAlgNode::getContextData(), RaExecutionSequence::getDescriptor(), RaExecutionSequence::getDescriptorByBodyId(), RelAlgNode::getId(), RelAlgNode::getInput(), getParsedQueryHint(), anonymous_namespace{RelAlgDag.cpp}::handle_query_hint(), handleNop(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), RelAlgNode::hasContextData(), RaExecutionSequence::hasQueryStepForUnion(), INJECT_TIMER, LOG, ExecutionOptions::outer_fragment_indices, setHasStepForUnion(), gpu_enabled::sort(), VLOG, and ExecutionOptions::with_watchdog.

Referenced by executeRelAlgSeq(), and executeRelAlgSubSeq().

1153  {
1155  auto timer = DEBUG_TIMER(__func__);
1156  auto exec_desc_ptr = seq.getDescriptor(step_idx);
1157  CHECK(exec_desc_ptr);
1158  auto& exec_desc = *exec_desc_ptr;
1159  const auto body = exec_desc.getBody();
1160  if (body->isNop()) {
1161  handleNop(exec_desc);
1162  return;
1163  }
1164  ExecutionOptions eo_copied = eo;
1165  CompilationOptions co_copied = co;
1166  eo_copied.with_watchdog =
1167  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body));
1168  eo_copied.outer_fragment_indices =
1169  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>();
1170 
1171  auto target_node = body;
1172  auto query_plan_dag_hash = body->getQueryPlanDagHash();
1173  if (auto sort_body = dynamic_cast<const RelSort*>(body)) {
1174  target_node = sort_body->getInput(0);
1176  target_node->getQueryPlanDagHash(), SortInfo::createFromSortNode(sort_body));
1177  } else {
1179  target_node->getQueryPlanDagHash(), SortInfo());
1180  }
1181  auto query_hints = getParsedQueryHint(target_node);
1182  if (query_hints) {
1183  handle_query_hint(*query_hints, eo_copied, co_copied);
1184  }
1185 
1187  if (canUseResultsetCache(eo, render_info) && has_valid_query_plan_dag(body)) {
1188  if (auto cached_resultset =
1189  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(
1190  query_plan_dag_hash)) {
1191  VLOG(1) << "recycle resultset of the root node " << body->getRelNodeDagId()
1192  << " from resultset cache";
1193  body->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
1194  if (render_info) {
1195  std::vector<std::shared_ptr<Analyzer::Expr>>& cached_target_exprs =
1196  executor_->getResultSetRecyclerHolder().getTargetExprs(query_plan_dag_hash);
1197  std::vector<Analyzer::Expr*> copied_target_exprs;
1198  for (const auto& expr : cached_target_exprs) {
1199  copied_target_exprs.push_back(expr.get());
1200  }
1202  *render_info, copied_target_exprs, cached_resultset->getTargetMetaInfo());
1203  }
1204  exec_desc.setResult({cached_resultset, cached_resultset->getTargetMetaInfo()});
1205  addTemporaryTable(-body->getId(), exec_desc.getResult().getDataPtr());
1206  return;
1207  }
1208  }
1209 
1210  const auto compound = dynamic_cast<const RelCompound*>(body);
1211  if (compound) {
1212  if (compound->isDeleteViaSelect()) {
1213  executeDelete(compound, co_copied, eo_copied, queue_time_ms);
1214  } else if (compound->isUpdateViaSelect()) {
1215  executeUpdate(compound, co_copied, eo_copied, queue_time_ms);
1216  } else {
1217  exec_desc.setResult(
1218  executeCompound(compound, co_copied, eo_copied, render_info, queue_time_ms));
1219  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
1220  << static_cast<int>(-compound->getId()) << ", ...)"
1221  << " exec_desc.getResult().getDataPtr()->rowCount()="
1222  << exec_desc.getResult().getDataPtr()->rowCount();
1223  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1224  return;
1225  }
1226  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
1227  }
1228  return;
1229  }
1230  const auto project = dynamic_cast<const RelProject*>(body);
1231  if (project) {
1232  if (project->isDeleteViaSelect()) {
1233  executeDelete(project, co_copied, eo_copied, queue_time_ms);
1234  } else if (project->isUpdateViaSelect()) {
1235  executeUpdate(project, co_copied, eo_copied, queue_time_ms);
1236  } else {
1237  std::optional<size_t> prev_count;
1238  // Disabling the intermediate count optimization in distributed, as the previous
1239  // execution descriptor will likely not hold the aggregated result.
1240  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
1241  // If the previous node produced a reliable count, skip the pre-flight count.
1242  RelAlgNode const* const prev_body = project->getInput(0);
1243  if (shared::dynamic_castable_to_any<RelCompound, RelLogicalValues>(prev_body)) {
1244  if (RaExecutionDesc const* const prev_exec_desc =
1245  prev_body->hasContextData()
1246  ? prev_body->getContextData()
1247  : seq.getDescriptorByBodyId(prev_body->getId(), step_idx - 1)) {
1248  const auto& prev_exe_result = prev_exec_desc->getResult();
1249  const auto prev_result = prev_exe_result.getRows();
1250  if (prev_result) {
1251  prev_count = prev_result->rowCount();
1252  VLOG(3) << "Setting output row count for projection node to previous node ("
1253  << prev_exec_desc->getBody()->toString(
1255  << ") to " << *prev_count;
1256  }
1257  }
1258  }
1259  }
1260  exec_desc.setResult(executeProject(
1261  project, co_copied, eo_copied, render_info, queue_time_ms, prev_count));
1262  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
1263  << static_cast<int>(-project->getId()) << ", ...)"
1264  << " exec_desc.getResult().getDataPtr()->rowCount()="
1265  << exec_desc.getResult().getDataPtr()->rowCount();
1266  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1267  return;
1268  }
1269  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
1270  }
1271  return;
1272  }
1273  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
1274  if (aggregate) {
1275  exec_desc.setResult(
1276  executeAggregate(aggregate, co_copied, eo_copied, render_info, queue_time_ms));
1277  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
1278  return;
1279  }
1280  const auto filter = dynamic_cast<const RelFilter*>(body);
1281  if (filter) {
1282  exec_desc.setResult(
1283  executeFilter(filter, co_copied, eo_copied, render_info, queue_time_ms));
1284  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
1285  return;
1286  }
1287  const auto sort = dynamic_cast<const RelSort*>(body);
1288  if (sort) {
1289  exec_desc.setResult(
1290  executeSort(sort, co_copied, eo_copied, render_info, queue_time_ms));
1291  if (exec_desc.getResult().isFilterPushDownEnabled()) {
1292  return;
1293  }
1294  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
1295  return;
1296  }
1297  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
1298  if (logical_values) {
1299  exec_desc.setResult(executeLogicalValues(logical_values, eo_copied));
1300  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
1301  return;
1302  }
1303  const auto modify = dynamic_cast<const RelModify*>(body);
1304  if (modify) {
1305  exec_desc.setResult(executeModify(modify, eo_copied));
1306  return;
1307  }
1308  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
1309  if (logical_union) {
1310  exec_desc.setResult(executeUnion(
1311  logical_union, seq, co_copied, eo_copied, render_info, queue_time_ms));
1312  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
1313  return;
1314  }
1315  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
1316  if (table_func) {
1317  exec_desc.setResult(
1318  executeTableFunction(table_func, co_copied, eo_copied, queue_time_ms));
1319  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
1320  return;
1321  }
1322  LOG(FATAL) << "Unhandled body type: "
1323  << body->toString(RelRexToStringConfig::defaults());
1324 }
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
bool has_valid_query_plan_dag(const RelAlgNode *node)
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
const bool hasQueryStepForUnion() const
bool g_skip_intermediate_count
#define LOG(tag)
Definition: Logger.h:285
std::vector< size_t > outer_fragment_indices
void handle_query_hint(const std::vector< std::shared_ptr< RelAlgNode >> &nodes, RelAlgDag &rel_alg_dag) noexcept
Definition: RelAlgDag.cpp:1570
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
std::optional< RegisteredQueryHint > getParsedQueryHint(const RelAlgNode *node)
static SortInfo createFromSortNode(const RelSort *sort_node)
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
void handleNop(RaExecutionDesc &ed)
unsigned getId() const
Definition: RelAlgDag.h:869
#define INJECT_TIMER(DESC)
Definition: measure.h:96
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
const RaExecutionDesc * getContextData() const
Definition: RelAlgDag.h:873
ExecutionResult executeUnion(const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:78
bool canUseResultsetCache(const ExecutionOptions &eo, RenderInfo *render_info) const
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool g_cluster
void executeDelete(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptorByBodyId(unsigned const body_id, size_t const start_idx) const
Executor * executor_
bool hasContextData() const
Definition: RelAlgDag.h:871
#define VLOG(n)
Definition: Logger.h:388
void setHasStepForUnion(bool flag)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgSubSeq ( const RaExecutionSequence seq,
const std::pair< size_t, size_t >  interval,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)

Definition at line 990 of file RelAlgExecutor.cpp.

References CHECK, CompilationOptions::device_type, executeRelAlgStep(), executor_, RenderInfo::forceNonInSitu(), g_allow_query_step_cpu_retry, g_cluster, RaExecutionSequence::getDescriptor(), GPU, logger::INFO, INJECT_TIMER, left_deep_join_info_, LOG, CompilationOptions::makeCpuOnly(), now_, gpu_enabled::swap(), and temporary_tables_.

Referenced by executeRelAlgQuerySingleStep().

996  {
998  executor_->temporary_tables_ = &temporary_tables_;
1000  time(&now_);
1001  for (size_t i = interval.first; i < interval.second; i++) {
1002  // only render on the last step
1003  try {
1004  executeRelAlgStep(seq,
1005  i,
1006  co,
1007  eo,
1008  (i == interval.second - 1) ? render_info : nullptr,
1009  queue_time_ms);
1010  } catch (const QueryMustRunOnCpu&) {
1011  // Do not allow per-step retry if flag is off or in distributed mode
1012  // TODO(todd): Determine if and when we can relax this restriction
1013  // for distributed
1016  throw;
1017  }
1018  LOG(INFO) << "Retrying current query step " << i << " on CPU";
1019  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
1020  if (render_info && i == interval.second - 1) {
1021  render_info->forceNonInSitu();
1022  }
1023  executeRelAlgStep(seq,
1024  i,
1025  co_cpu,
1026  eo,
1027  (i == interval.second - 1) ? render_info : nullptr,
1028  queue_time_ms);
1029  }
1030  }
1031 
1032  return seq.getDescriptor(interval.second - 1)->getResult();
1033 }
RaExecutionDesc * getDescriptor(size_t idx) const
void forceNonInSitu()
Definition: RenderInfo.cpp:46
#define LOG(tag)
Definition: Logger.h:285
TemporaryTables temporary_tables_
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:96
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutorDeviceType device_type
std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_join_info_
#define CHECK(condition)
Definition: Logger.h:291
bool g_allow_query_step_cpu_retry
Definition: Execute.cpp:90
bool g_cluster
Executor * executor_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeSimpleInsert ( const Analyzer::Query insert_query,
Fragmenter_Namespace::InsertDataLoader inserter,
const Catalog_Namespace::SessionInfo session 
)

Definition at line 2860 of file RelAlgExecutor.cpp.

References append_datum(), DataBlockPtr::arraysPtr, CHECK, CHECK_EQ, checked_malloc(), Executor::clearExternalCaches(), Fragmenter_Namespace::InsertData::columnIds, ColumnDescriptor::columnType, CPU, Fragmenter_Namespace::InsertData::data, Fragmenter_Namespace::InsertData::databaseId, executor_, import_export::fill_missing_columns(), get_column_descriptor(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_elem_type(), Analyzer::Query::get_result_col_list(), Analyzer::Query::get_result_table_id(), SQLTypeInfo::get_size(), SQLTypeInfo::get_type(), Analyzer::Query::get_values_lists(), Catalog_Namespace::SessionInfo::getCatalog(), Fragmenter_Namespace::InsertDataLoader::getLeafCount(), Catalog_Namespace::Catalog::getMetadataForTable(), inline_fixed_encoding_null_val(), anonymous_namespace{RelAlgExecutor.cpp}::insert_one_dict_str(), Fragmenter_Namespace::InsertDataLoader::insertData(), is_null(), SQLTypeInfo::is_string(), kARRAY, kBIGINT, kBOOLEAN, kCAST, kCHAR, kDATE, kDECIMAL, kDOUBLE, kENCODING_DICT, kENCODING_NONE, kFLOAT, kINT, kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kNUMERIC, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, DataBlockPtr::numbersPtr, Fragmenter_Namespace::InsertData::numRows, anonymous_namespace{TypedDataAccessors.h}::put_null(), anonymous_namespace{TypedDataAccessors.h}::put_null_array(), DataBlockPtr::stringsPtr, Fragmenter_Namespace::InsertData::tableId, and to_string().

2863  {
2864  // Note: We currently obtain an executor for this method, but we do not need it.
2865  // Therefore, we skip the executor state setup in the regular execution path. In the
2866  // future, we will likely want to use the executor to evaluate expressions in the insert
2867  // statement.
2868 
2869  const auto& values_lists = query.get_values_lists();
2870  const int table_id = query.get_result_table_id();
2871  const auto& col_id_list = query.get_result_col_list();
2872  size_t rows_number = values_lists.size();
2873  size_t leaf_count = inserter.getLeafCount();
2874  const auto& catalog = session.getCatalog();
2875  const auto td = catalog.getMetadataForTable(table_id);
2876  CHECK(td);
2877  size_t rows_per_leaf = rows_number;
2878  if (td->nShards == 0) {
2879  rows_per_leaf =
2880  ceil(static_cast<double>(rows_number) / static_cast<double>(leaf_count));
2881  }
2882  auto max_number_of_rows_per_package =
2883  std::max(size_t(1), std::min(rows_per_leaf, size_t(64 * 1024)));
2884 
2885  std::vector<const ColumnDescriptor*> col_descriptors;
2886  std::vector<int> col_ids;
2887  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2888  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2889  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2890  std::unordered_map<int, int> sequential_ids;
2891 
2892  for (const int col_id : col_id_list) {
2893  const auto cd = get_column_descriptor({catalog.getDatabaseId(), table_id, col_id});
2894  const auto col_enc = cd->columnType.get_compression();
2895  if (cd->columnType.is_string()) {
2896  switch (col_enc) {
2897  case kENCODING_NONE: {
2898  auto it_ok =
2899  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2900  CHECK(it_ok.second);
2901  break;
2902  }
2903  case kENCODING_DICT: {
2904  const auto dd = catalog.getMetadataForDict(cd->columnType.get_comp_param());
2905  CHECK(dd);
2906  const auto it_ok = col_buffers.emplace(
2907  col_id,
2908  std::make_unique<uint8_t[]>(cd->columnType.get_size() *
2909  max_number_of_rows_per_package));
2910  CHECK(it_ok.second);
2911  break;
2912  }
2913  default:
2914  CHECK(false);
2915  }
2916  } else if (cd->columnType.is_geometry()) {
2917  auto it_ok =
2918  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2919  CHECK(it_ok.second);
2920  } else if (cd->columnType.is_array()) {
2921  auto it_ok =
2922  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2923  CHECK(it_ok.second);
2924  } else {
2925  const auto it_ok = col_buffers.emplace(
2926  col_id,
2927  std::unique_ptr<uint8_t[]>(new uint8_t[cd->columnType.get_logical_size() *
2928  max_number_of_rows_per_package]()));
2929  CHECK(it_ok.second);
2930  }
2931  col_descriptors.push_back(cd);
2932  sequential_ids[col_id] = col_ids.size();
2933  col_ids.push_back(col_id);
2934  }
2935 
2936  Executor::clearExternalCaches(true, td, catalog.getDatabaseId());
2937 
2938  size_t start_row = 0;
2939  size_t rows_left = rows_number;
2940  while (rows_left != 0) {
2941  // clear the buffers
2942  for (const auto& kv : col_buffers) {
2943  memset(kv.second.get(), 0, max_number_of_rows_per_package);
2944  }
2945  for (auto& kv : str_col_buffers) {
2946  kv.second.clear();
2947  }
2948  for (auto& kv : arr_col_buffers) {
2949  kv.second.clear();
2950  }
2951 
2952  auto package_size = std::min(rows_left, max_number_of_rows_per_package);
2953  // Note: if there will be use cases with batch inserts with lots of rows, it might be
2954  // more efficient to do the loops below column by column instead of row by row.
2955  // But for now I consider such a refactoring not worth investigating, as we have more
2956  // efficient ways to insert many rows anyway.
2957  for (size_t row_idx = 0; row_idx < package_size; ++row_idx) {
2958  const auto& values_list = values_lists[row_idx + start_row];
2959  for (size_t col_idx = 0; col_idx < col_descriptors.size(); ++col_idx) {
2960  CHECK(values_list.size() == col_descriptors.size());
2961  auto col_cv =
2962  dynamic_cast<const Analyzer::Constant*>(values_list[col_idx]->get_expr());
2963  if (!col_cv) {
2964  auto col_cast =
2965  dynamic_cast<const Analyzer::UOper*>(values_list[col_idx]->get_expr());
2966  CHECK(col_cast);
2967  CHECK_EQ(kCAST, col_cast->get_optype());
2968  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2969  }
2970  CHECK(col_cv);
2971  const auto cd = col_descriptors[col_idx];
2972  auto col_datum = col_cv->get_constval();
2973  auto col_type = cd->columnType.get_type();
2974  uint8_t* col_data_bytes{nullptr};
2975  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2976  (!cd->columnType.is_string() ||
2977  cd->columnType.get_compression() == kENCODING_DICT)) {
2978  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2979  CHECK(col_data_bytes_it != col_buffers.end());
2980  col_data_bytes = col_data_bytes_it->second.get();
2981  }
2982  switch (col_type) {
2983  case kBOOLEAN: {
2984  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2985  auto null_bool_val =
2986  col_datum.boolval == inline_fixed_encoding_null_val(cd->columnType);
2987  col_data[row_idx] = col_cv->get_is_null() || null_bool_val
2988  ? inline_fixed_encoding_null_val(cd->columnType)
2989  : (col_datum.boolval ? 1 : 0);
2990  break;
2991  }
2992  case kTINYINT: {
2993  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2994  col_data[row_idx] = col_cv->get_is_null()
2995  ? inline_fixed_encoding_null_val(cd->columnType)
2996  : col_datum.tinyintval;
2997  break;
2998  }
2999  case kSMALLINT: {
3000  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
3001  col_data[row_idx] = col_cv->get_is_null()
3002  ? inline_fixed_encoding_null_val(cd->columnType)
3003  : col_datum.smallintval;
3004  break;
3005  }
3006  case kINT: {
3007  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
3008  col_data[row_idx] = col_cv->get_is_null()
3009  ? inline_fixed_encoding_null_val(cd->columnType)
3010  : col_datum.intval;
3011  break;
3012  }
3013  case kBIGINT:
3014  case kDECIMAL:
3015  case kNUMERIC: {
3016  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
3017  col_data[row_idx] = col_cv->get_is_null()
3018  ? inline_fixed_encoding_null_val(cd->columnType)
3019  : col_datum.bigintval;
3020  break;
3021  }
3022  case kFLOAT: {
3023  auto col_data = reinterpret_cast<float*>(col_data_bytes);
3024  col_data[row_idx] = col_datum.floatval;
3025  break;
3026  }
3027  case kDOUBLE: {
3028  auto col_data = reinterpret_cast<double*>(col_data_bytes);
3029  col_data[row_idx] = col_datum.doubleval;
3030  break;
3031  }
3032  case kTEXT:
3033  case kVARCHAR:
3034  case kCHAR: {
3035  switch (cd->columnType.get_compression()) {
3036  case kENCODING_NONE:
3037  str_col_buffers[col_ids[col_idx]].push_back(
3038  col_datum.stringval ? *col_datum.stringval : "");
3039  break;
3040  case kENCODING_DICT: {
3041  switch (cd->columnType.get_size()) {
3042  case 1:
3044  &reinterpret_cast<uint8_t*>(col_data_bytes)[row_idx],
3045  cd,
3046  col_cv,
3047  catalog);
3048  break;
3049  case 2:
3051  &reinterpret_cast<uint16_t*>(col_data_bytes)[row_idx],
3052  cd,
3053  col_cv,
3054  catalog);
3055  break;
3056  case 4:
3058  &reinterpret_cast<int32_t*>(col_data_bytes)[row_idx],
3059  cd,
3060  col_cv,
3061  catalog);
3062  break;
3063  default:
3064  CHECK(false);
3065  }
3066  break;
3067  }
3068  default:
3069  CHECK(false);
3070  }
3071  break;
3072  }
3073  case kTIME:
3074  case kTIMESTAMP:
3075  case kDATE: {
3076  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
3077  col_data[row_idx] = col_cv->get_is_null()
3078  ? inline_fixed_encoding_null_val(cd->columnType)
3079  : col_datum.bigintval;
3080  break;
3081  }
3082  case kARRAY: {
3083  const auto is_null = col_cv->get_is_null();
3084  const auto size = cd->columnType.get_size();
3085  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
3086  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
3087  const auto is_point_coords =
3088  (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
3089  if (is_null && !is_point_coords) {
3090  if (size > 0) {
3091  int8_t* buf = (int8_t*)checked_malloc(size);
3092  put_null_array(static_cast<void*>(buf), elem_ti, "");
3093  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
3094  p += elem_ti.get_size()) {
3095  put_null(static_cast<void*>(p), elem_ti, "");
3096  }
3097  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
3098  } else {
3099  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
3100  }
3101  break;
3102  }
3103  const auto l = col_cv->get_value_list();
3104  size_t len = l.size() * elem_ti.get_size();
3105  if (size > 0 && static_cast<size_t>(size) != len) {
3106  throw std::runtime_error("Array column " + cd->columnName + " expects " +
3107  std::to_string(size / elem_ti.get_size()) +
3108  " values, " + "received " +
3109  std::to_string(l.size()));
3110  }
3111  if (elem_ti.is_string()) {
3112  CHECK(kENCODING_DICT == elem_ti.get_compression());
3113  CHECK(4 == elem_ti.get_size());
3114 
3115  int8_t* buf = (int8_t*)checked_malloc(len);
3116  int32_t* p = reinterpret_cast<int32_t*>(buf);
3117 
3118  int elemIndex = 0;
3119  for (auto& e : l) {
3120  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3121  CHECK(c);
3123  &p[elemIndex], cd->columnName, elem_ti, c.get(), catalog);
3124  elemIndex++;
3125  }
3126  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3127 
3128  } else {
3129  int8_t* buf = (int8_t*)checked_malloc(len);
3130  int8_t* p = buf;
3131  for (auto& e : l) {
3132  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
3133  CHECK(c);
3134  p = append_datum(p, c->get_constval(), elem_ti);
3135  CHECK(p);
3136  }
3137  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
3138  }
3139  break;
3140  }
3141  case kPOINT:
3142  case kMULTIPOINT:
3143  case kLINESTRING:
3144  case kMULTILINESTRING:
3145  case kPOLYGON:
3146  case kMULTIPOLYGON:
3147  if (col_datum.stringval && col_datum.stringval->empty()) {
3148  throw std::runtime_error(
3149  "Empty values are not allowed for geospatial column \"" +
3150  cd->columnName + "\"");
3151  } else {
3152  str_col_buffers[col_ids[col_idx]].push_back(
3153  col_datum.stringval ? *col_datum.stringval : "");
3154  }
3155  break;
3156  default:
3157  CHECK(false);
3158  }
3159  }
3160  }
3161  start_row += package_size;
3162  rows_left -= package_size;
3163 
3165  insert_data.databaseId = catalog.getCurrentDB().dbId;
3166  insert_data.tableId = table_id;
3167  insert_data.data.resize(col_ids.size());
3168  insert_data.columnIds = col_ids;
3169  for (const auto& kv : col_buffers) {
3170  DataBlockPtr p;
3171  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
3172  insert_data.data[sequential_ids[kv.first]] = p;
3173  }
3174  for (auto& kv : str_col_buffers) {
3175  DataBlockPtr p;
3176  p.stringsPtr = &kv.second;
3177  insert_data.data[sequential_ids[kv.first]] = p;
3178  }
3179  for (auto& kv : arr_col_buffers) {
3180  DataBlockPtr p;
3181  p.arraysPtr = &kv.second;
3182  insert_data.data[sequential_ids[kv.first]] = p;
3183  }
3184  insert_data.numRows = package_size;
3185  auto data_memory_holder = import_export::fill_missing_columns(&catalog, insert_data);
3186  inserter.insertData(session, insert_data);
3187  }
3188 
3189  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
3192  executor_->getRowSetMemoryOwner(),
3193  0,
3194  0);
3195  std::vector<TargetMetaInfo> empty_targets;
3196  return {rs, empty_targets};
3197 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
int8_t * append_datum(int8_t *buf, const Datum &d, const SQLTypeInfo &ti)
Definition: Datum.cpp:580
Definition: sqltypes.h:76
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:234
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:235
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:6141
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
Definition: sqldefs.h:48
std::vector< TargetInfo > TargetInfoList
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
void insertData(const Catalog_Namespace::SessionInfo &session_info, InsertData &insert_data)
std::string to_string(char const *&&v)
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:229
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
CONSTEXPR DEVICE bool is_null(const T &value)
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
const ColumnDescriptor * get_column_descriptor(const shared::ColumnKey &column_key)
Definition: Execute.h:213
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
Definition: sqltypes.h:79
Definition: sqltypes.h:80
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:73
Catalog & getCatalog() const
Definition: SessionInfo.h:75
Definition: sqltypes.h:68
#define CHECK(condition)
Definition: Logger.h:291
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:438
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
Definition: sqltypes.h:72
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_string() const
Definition: sqltypes.h:559
int8_t * numbersPtr
Definition: sqltypes.h:233
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:975
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
Executor * executor_

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeSort ( const RelSort sort,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 3224 of file RelAlgExecutor.cpp.

References SpeculativeTopNBlacklist::add(), addTemporaryTable(), anonymous_namespace{QueryMemoryDescriptor.cpp}::any_of(), QueryPlanDagExtractor::applyLimitClauseToCacheKey(), build_render_targets(), canUseResultsetCache(), CHECK, CHECK_EQ, anonymous_namespace{RelAlgExecutor.cpp}::check_sort_node_source_constraint(), RelSort::collationCount(), CPU, createSortInputWorkUnit(), DEBUG_TIMER, CompilationOptions::device_type, executeWorkUnit(), executor_, anonymous_namespace{RelAlgExecutor.cpp}::first_oe_is_desc(), g_cluster, anonymous_namespace{RelAlgExecutor.cpp}::get_limit_value(), ExecutionResult::getDataPtr(), RelAlgNode::getId(), RelAlgNode::getInput(), RelSort::getLimit(), RelSort::getOffset(), RelSort::getOrderEntries(), GPU, anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), hasStepForUnion(), anonymous_namespace{RelAlgExecutor.cpp}::is_none_encoded_text(), RelSort::isEmptyResult(), ExecutionOptions::just_validate, leaf_results_, anonymous_namespace{RelAlgExecutor.cpp}::node_is_aggregate(), run_benchmark_import::result, RelAlgNode::setOutputMetainfo(), gpu_enabled::sort(), speculative_topn_blacklist_, SpeculativeTopN, temporary_tables_, use_speculative_top_n(), and VLOG.

Referenced by executeRelAlgStep().

3228  {
3229  auto timer = DEBUG_TIMER(__func__);
3231  const auto source = sort->getInput(0);
3232  const bool is_aggregate = node_is_aggregate(source);
3233  auto it = leaf_results_.find(sort->getId());
3234  auto order_entries = sort->getOrderEntries();
3235  if (it != leaf_results_.end()) {
3236  // Add any transient string literals to the sdp on the agg
3237  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3238  executor_->addTransientStringLiterals(source_work_unit.exe_unit,
3239  executor_->row_set_mem_owner_);
3240  // Handle push-down for LIMIT for multi-node
3241  auto& aggregated_result = it->second;
3242  auto& result_rows = aggregated_result.rs;
3243  auto limit = sort->getLimit();
3244  const size_t offset = sort->getOffset();
3245  if (limit || offset) {
3246  if (!order_entries.empty()) {
3247  result_rows->sort(
3248  order_entries, get_limit_value(limit) + offset, co.device_type, executor_);
3249  }
3250  result_rows->dropFirstN(offset);
3251  if (limit) {
3252  result_rows->keepFirstN(get_limit_value(limit));
3253  }
3254  }
3255 
3256  if (render_info) {
3257  // We've hit a sort step that is the very last step
3258  // in a distributed render query. We'll fill in the render targets
3259  // since we have all that data needed to do so. This is normally
3260  // done in executeWorkUnit, but that is bypassed in this case.
3261  build_render_targets(*render_info,
3262  source_work_unit.exe_unit.target_exprs,
3263  aggregated_result.targets_meta);
3264  }
3265 
3266  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3267  sort->setOutputMetainfo(aggregated_result.targets_meta);
3268 
3269  return result;
3270  }
3271 
3272  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
3273  bool is_desc{false};
3274  bool use_speculative_top_n_sort{false};
3275 
3276  auto execute_sort_query = [this,
3277  sort,
3278  &source,
3279  &is_aggregate,
3280  &eo,
3281  &co,
3282  render_info,
3283  queue_time_ms,
3284  &groupby_exprs,
3285  &is_desc,
3286  &order_entries,
3287  &use_speculative_top_n_sort]() -> ExecutionResult {
3288  std::optional<size_t> limit = sort->getLimit();
3289  const size_t offset = sort->getOffset();
3290  // check whether sort's input is cached
3291  auto source_node = sort->getInput(0);
3292  CHECK(source_node);
3293  ExecutionResult source_result{nullptr, {}};
3295  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
3296  auto source_query_plan_dag = QueryPlanDagExtractor::applyLimitClauseToCacheKey(
3297  source_node->getQueryPlanDagHash(), sort_info);
3298  bool enable_resultset_recycler = canUseResultsetCache(eo, render_info);
3299  if (enable_resultset_recycler && has_valid_query_plan_dag(source_node) &&
3300  !sort->isEmptyResult()) {
3301  if (auto cached_resultset =
3302  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(
3303  source_query_plan_dag)) {
3304  CHECK(cached_resultset->canUseSpeculativeTopNSort());
3305  VLOG(1) << "recycle resultset of the root node " << source_node->getRelNodeDagId()
3306  << " from resultset cache";
3307  source_result =
3308  ExecutionResult{cached_resultset, cached_resultset->getTargetMetaInfo()};
3309  if (temporary_tables_.find(-source_node->getId()) == temporary_tables_.end()) {
3310  addTemporaryTable(-source_node->getId(), cached_resultset);
3311  }
3312  use_speculative_top_n_sort = *cached_resultset->canUseSpeculativeTopNSort() &&
3313  co.device_type == ExecutorDeviceType::GPU;
3314  source_node->setOutputMetainfo(cached_resultset->getTargetMetaInfo());
3315  sort->setOutputMetainfo(source_node->getOutputMetainfo());
3316  }
3317  }
3318  if (!source_result.getDataPtr()) {
3319  const auto source_work_unit = createSortInputWorkUnit(sort, order_entries, eo);
3320  is_desc = first_oe_is_desc(order_entries);
3321  ExecutionOptions eo_copy = eo;
3322  CompilationOptions co_copy = co;
3323  eo_copy.just_validate = eo.just_validate || sort->isEmptyResult();
3324  if (hasStepForUnion() &&
3325  boost::algorithm::any_of(source->getOutputMetainfo(), is_none_encoded_text)) {
3327  VLOG(1) << "Punt sort's input query to CPU: detect union(-all) of none-encoded "
3328  "text column";
3329  }
3330 
3331  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
3332  source_result = executeWorkUnit(source_work_unit,
3333  source->getOutputMetainfo(),
3334  is_aggregate,
3335  co_copy,
3336  eo_copy,
3337  render_info,
3338  queue_time_ms);
3339  use_speculative_top_n_sort =
3340  source_result.getDataPtr() && source_result.getRows()->hasValidBuffer() &&
3341  use_speculative_top_n(source_work_unit.exe_unit,
3342  source_result.getRows()->getQueryMemDesc());
3343  }
3344  if (render_info && render_info->isInSitu()) {
3345  return source_result;
3346  }
3347  if (source_result.isFilterPushDownEnabled()) {
3348  return source_result;
3349  }
3350  auto rows_to_sort = source_result.getRows();
3351  if (eo.just_explain) {
3352  return {rows_to_sort, {}};
3353  }
3354  auto const limit_val = get_limit_value(limit);
3355  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
3356  !use_speculative_top_n_sort) {
3357  const size_t top_n = limit_val + offset;
3358  rows_to_sort->sort(order_entries, top_n, co.device_type, executor_);
3359  }
3360  if (limit || offset) {
3361  if (g_cluster && sort->collationCount() == 0) {
3362  if (offset >= rows_to_sort->rowCount()) {
3363  rows_to_sort->dropFirstN(offset);
3364  } else {
3365  rows_to_sort->keepFirstN(limit_val + offset);
3366  }
3367  } else {
3368  rows_to_sort->dropFirstN(offset);
3369  if (limit) {
3370  rows_to_sort->keepFirstN(limit_val);
3371  }
3372  }
3373  }
3374  return {rows_to_sort, source_result.getTargetsMeta()};
3375  };
3376 
3377  try {
3378  return execute_sort_query();
3379  } catch (const SpeculativeTopNFailed& e) {
3380  CHECK_EQ(size_t(1), groupby_exprs.size());
3381  CHECK(groupby_exprs.front());
3382  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
3383  return execute_sort_query();
3384  }
3385 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
#define CHECK_EQ(x, y)
Definition: Logger.h:301
size_t getOffset() const
Definition: RelAlgDag.h:2228
bool has_valid_query_plan_dag(const RelAlgNode *node)
size_t get_limit_value(std::optional< size_t > limit)
std::list< Analyzer::OrderEntry > getOrderEntries() const
Definition: RelAlgDag.h:2264
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
bool is_none_encoded_text(TargetMetaInfo const &target_meta_info)
static SpeculativeTopNBlacklist speculative_topn_blacklist_
TemporaryTables temporary_tables_
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
const ResultSetPtr & getDataPtr() const
bool hasStepForUnion() const
unsigned getId() const
Definition: RelAlgDag.h:869
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
const RelAlgNode * getInput(const size_t idx) const
Definition: RelAlgDag.h:877
ExecutorDeviceType device_type
bool node_is_aggregate(const RelAlgNode *ra)
bool isEmptyResult() const
Definition: RelAlgDag.h:2222
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
size_t collationCount() const
Definition: RelAlgDag.h:2211
bool canUseResultsetCache(const ExecutionOptions &eo, RenderInfo *render_info) const
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
bool g_cluster
void add(const std::shared_ptr< Analyzer::Expr > expr, const bool desc)
bool any_of(std::vector< Analyzer::Expr * > const &target_exprs)
std::optional< size_t > getLimit() const
Definition: RelAlgDag.h:2226
Executor * executor_
#define VLOG(n)
Definition: Logger.h:388
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)
WorkUnit createSortInputWorkUnit(const RelSort *, std::list< Analyzer::OrderEntry > &order_entries, const ExecutionOptions &eo)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeTableFunction ( const RelTableFunction table_func,
const CompilationOptions co_in,
const ExecutionOptions eo,
const int64_t  queue_time_ms 
)
private

Definition at line 2357 of file RelAlgExecutor.cpp.

References addTemporaryTable(), canUseResultsetCache(), CHECK, createTableFunctionWorkUnit(), DEBUG_TIMER, Executor::ERR_OUT_OF_GPU_MEM, executor_, g_allow_auto_resultset_caching, g_auto_resultset_caching_threshold, g_cluster, g_enable_table_functions, get_table_infos(), QueryExecutionError::getErrorCode(), getGlobalQueryHint(), RelAlgNode::getQueryPlanDagHash(), RelAlgNode::getRelNodeDagId(), ScanNodeTableKeyCollector::getScanNodeTableKey(), GPU, handlePersistentError(), anonymous_namespace{RelAlgExecutor.cpp}::has_valid_query_plan_dag(), hasStepForUnion(), INJECT_TIMER, is_validate_or_explain_query(), ExecutionOptions::just_explain, ExecutionOptions::keep_result, kKeepTableFuncResult, run_benchmark_import::result, target_exprs_owned_, timer_start(), timer_stop(), and VLOG.

Referenced by executeRelAlgStep().

2360  {
2362  auto timer = DEBUG_TIMER(__func__);
2363 
2364  auto co = co_in;
2365 
2366  if (g_cluster) {
2367  throw std::runtime_error("Table functions not supported in distributed mode yet");
2368  }
2369  if (!g_enable_table_functions) {
2370  throw std::runtime_error("Table function support is disabled");
2371  }
2372  auto table_func_work_unit = createTableFunctionWorkUnit(
2373  table_func,
2374  eo.just_explain,
2375  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
2376  const auto body = table_func_work_unit.body;
2377  CHECK(body);
2378 
2379  const auto table_infos =
2380  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
2381 
2382  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2383  co.device_type,
2385  nullptr,
2386  executor_->blockSize(),
2387  executor_->gridSize()),
2388  {}};
2389 
2390  auto global_hint = getGlobalQueryHint();
2391  auto use_resultset_recycler = canUseResultsetCache(eo, nullptr);
2392  if (use_resultset_recycler && has_valid_query_plan_dag(table_func)) {
2393  auto cached_resultset =
2394  executor_->getResultSetRecyclerHolder().getCachedQueryResultSet(
2395  table_func->getQueryPlanDagHash());
2396  if (cached_resultset) {
2397  VLOG(1) << "recycle table function's resultset of the root node "
2398  << table_func->getRelNodeDagId() << " from resultset cache";
2399  result = {cached_resultset, cached_resultset->getTargetMetaInfo()};
2400  addTemporaryTable(-body->getId(), result.getDataPtr());
2401  return result;
2402  }
2403  }
2404 
2405  auto query_exec_time_begin = timer_start();
2406  try {
2407  result = {executor_->executeTableFunction(
2408  table_func_work_unit.exe_unit, table_infos, co, eo),
2409  body->getOutputMetainfo()};
2410  } catch (const QueryExecutionError& e) {
2413  throw std::runtime_error("Table function ran out of memory during execution");
2414  }
2415  auto query_exec_time = timer_stop(query_exec_time_begin);
2416  result.setQueueTime(queue_time_ms);
2417  auto resultset_ptr = result.getDataPtr();
2418  auto allow_auto_caching_resultset = resultset_ptr && resultset_ptr->hasValidBuffer() &&
2420  resultset_ptr->getBufferSizeBytes(co.device_type) <=
2422  bool keep_result = global_hint->isHintRegistered(QueryHint::kKeepTableFuncResult);
2423  if (use_resultset_recycler && (keep_result || allow_auto_caching_resultset) &&
2424  !hasStepForUnion()) {
2425  resultset_ptr->setExecTime(query_exec_time);
2426  resultset_ptr->setQueryPlanHash(table_func_work_unit.exe_unit.query_plan_dag_hash);
2427  resultset_ptr->setTargetMetaInfo(body->getOutputMetainfo());
2428  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
2429  resultset_ptr->setInputTableKeys(std::move(input_table_keys));
2430  if (allow_auto_caching_resultset) {
2431  VLOG(1) << "Automatically keep table function's query resultset to recycler";
2432  }
2433  executor_->getResultSetRecyclerHolder().putQueryResultSetToCache(
2434  table_func_work_unit.exe_unit.query_plan_dag_hash,
2435  resultset_ptr->getInputTableKeys(),
2436  resultset_ptr,
2437  resultset_ptr->getBufferSizeBytes(co.device_type),
2439  } else {
2440  if (eo.keep_result) {
2441  if (g_cluster) {
2442  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since we do not "
2443  "support resultset recycling on distributed mode";
2444  } else if (hasStepForUnion()) {
2445  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2446  "has union-(all) operator";
2447  } else if (is_validate_or_explain_query(eo)) {
2448  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored since a query "
2449  "is either validate or explain query";
2450  } else {
2451  VLOG(1) << "Query hint \'keep_table_function_result\' is ignored";
2452  }
2453  }
2454  }
2455 
2456  return result;
2457 }
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool has_valid_query_plan_dag(const RelAlgNode *node)
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
bool is_validate_or_explain_query(const ExecutionOptions &eo)
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
static std::unordered_set< size_t > getScanNodeTableKey(RelAlgNode const *rel_alg_node)
bool hasStepForUnion() const
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:863
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
#define INJECT_TIMER(DESC)
Definition: measure.h:96
bool g_allow_auto_resultset_caching
Definition: Execute.cpp:158
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1616
std::optional< RegisteredQueryHint > getGlobalQueryHint()
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
bool canUseResultsetCache(const ExecutionOptions &eo, RenderInfo *render_info) const
static void handlePersistentError(const int32_t error_code)
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:412
bool g_cluster
size_t getRelNodeDagId() const
Definition: RelAlgDag.h:921
Executor * executor_
#define VLOG(n)
Definition: Logger.h:388
Type timer_start()
Definition: measure.h:42
size_t g_auto_resultset_caching_threshold
Definition: Execute.cpp:164
bool g_enable_table_functions
Definition: Execute.cpp:117

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeUnion ( const RelLogicalUnion logical_union,
const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2726 of file RelAlgExecutor.cpp.

References anonymous_namespace{QueryMemoryDescriptor.cpp}::any_of(), createUnionWorkUnit(), DEBUG_TIMER, executeWorkUnit(), RelLogicalUnion::getCompatibleMetainfoTypes(), RelAlgNode::getOutputMetainfo(), RelLogicalUnion::isAll(), isGeometry(), CompilationOptions::makeCpuOnly(), and RelAlgNode::setOutputMetainfo().

Referenced by executeRelAlgStep().

2731  {
2732  auto timer = DEBUG_TIMER(__func__);
2733  if (!logical_union->isAll()) {
2734  throw std::runtime_error("UNION without ALL is not supported yet.");
2735  }
2736  // Will throw a std::runtime_error if types don't match.
2737  logical_union->setOutputMetainfo(logical_union->getCompatibleMetainfoTypes());
2738  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2739  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2740  }
2741  auto work_unit = createUnionWorkUnit(logical_union, SortInfo(), eo);
2742  return executeWorkUnit(work_unit,
2743  logical_union->getOutputMetainfo(),
2744  false,
2746  eo,
2747  render_info,
2748  queue_time_ms);
2749 }
bool isAll() const
Definition: RelAlgDag.h:2770
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool isGeometry(TargetMetaInfo const &target_meta_info)
WorkUnit createUnionWorkUnit(const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
std::vector< TargetMetaInfo > getCompatibleMetainfoTypes() const
Definition: RelAlgDag.cpp:911
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define DEBUG_TIMER(name)
Definition: Logger.h:412
bool any_of(std::vector< Analyzer::Expr * > const &target_exprs)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Definition: RelAlgDag.h:865

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeUpdate ( const RelAlgNode node,
const CompilationOptions co,
const ExecutionOptions eo,
const int64_t  queue_time_ms 
)
private

Definition at line 2016 of file RelAlgExecutor.cpp.

References CompilationOptions::allow_lazy_fetch, CHECK, CHECK_EQ, Executor::clearExternalCaches(), computeWindow(), CPU, createCompoundWorkUnit(), createProjectWorkUnit(), DEBUG_TIMER, RelRexToStringConfig::defaults(), dml_transaction_parameters_, executor_, CompilationOptions::filter_on_deleted_column, get_table_infos(), get_temporary_table(), QueryExecutionError::getErrorCode(), getErrorMessageFromCode(), CompilationOptions::hoist_literals, leaf_results_, CompilationOptions::makeCpuOnly(), post_execution_callback_, StorageIOFacility::TransactionParameters::setInputSourceNode(), temporary_tables_, and StorageIOFacility::yieldUpdateCallback().

Referenced by executeRelAlgStep().

2019  {
2020  CHECK(node);
2021  auto timer = DEBUG_TIMER(__func__);
2022 
2023  auto co = co_in;
2024  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
2025  // encoded string updates
2026 
2027  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
2028  auto& work_unit,
2029  const bool is_aggregate) {
2030  auto table_descriptor = node->getModifiedTableDescriptor();
2031  CHECK(table_descriptor);
2032  if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
2033  throw std::runtime_error(
2034  "UPDATE queries involving variable length columns are only supported on tables "
2035  "with the vacuum attribute set to 'delayed'");
2036  }
2037 
2038  auto catalog = node->getModifiedTableCatalog();
2039  CHECK(catalog);
2040  Executor::clearExternalCaches(true, table_descriptor, catalog->getDatabaseId());
2041 
2043  std::make_unique<UpdateTransactionParameters>(table_descriptor,
2044  *catalog,
2045  node->getTargetColumns(),
2046  node->getOutputMetainfo(),
2047  node->isVarlenUpdateRequired());
2048 
2049  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2050 
2051  auto execute_update_ra_exe_unit =
2052  [this, &co, &eo_in, &table_infos, &table_descriptor, &node, catalog](
2053  const RelAlgExecutionUnit& ra_exe_unit, const bool is_aggregate) {
2055 
2056  auto eo = eo_in;
2057  if (dml_transaction_parameters_->tableIsTemporary()) {
2058  eo.output_columnar_hint = true;
2059  co_project.allow_lazy_fetch = false;
2060  co_project.filter_on_deleted_column =
2061  false; // project the entire delete column for columnar update
2062  }
2063 
2064  auto update_transaction_parameters = dynamic_cast<UpdateTransactionParameters*>(
2065  dml_transaction_parameters_.get());
2066  update_transaction_parameters->setInputSourceNode(node);
2067  CHECK(update_transaction_parameters);
2068  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
2069  try {
2070  auto table_update_metadata =
2071  executor_->executeUpdate(ra_exe_unit,
2072  table_infos,
2073  table_descriptor,
2074  co_project,
2075  eo,
2076  *catalog,
2077  executor_->row_set_mem_owner_,
2078  update_callback,
2079  is_aggregate);
2080  post_execution_callback_ = [table_update_metadata, this, catalog]() {
2081  dml_transaction_parameters_->finalizeTransaction(*catalog);
2082  TableOptimizer table_optimizer{
2083  dml_transaction_parameters_->getTableDescriptor(), executor_, *catalog};
2084  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
2085  };
2086  } catch (const QueryExecutionError& e) {
2087  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
2088  }
2089  };
2090 
2091  if (dml_transaction_parameters_->tableIsTemporary()) {
2092  // hold owned target exprs during execution if rewriting
2093  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
2094  // rewrite temp table updates to generate the full column by moving the where
2095  // clause into a case if such a rewrite is not possible, bail on the update
2096  // operation build an expr for the update target
2097  auto update_transaction_params =
2098  dynamic_cast<UpdateTransactionParameters*>(dml_transaction_parameters_.get());
2099  CHECK(update_transaction_params);
2100  const auto td = update_transaction_params->getTableDescriptor();
2101  CHECK(td);
2102  const auto update_column_names = update_transaction_params->getUpdateColumnNames();
2103  if (update_column_names.size() > 1) {
2104  throw std::runtime_error(
2105  "Multi-column update is not yet supported for temporary tables.");
2106  }
2107 
2108  const auto cd =
2109  catalog->getMetadataForColumn(td->tableId, update_column_names.front());
2110  CHECK(cd);
2111  auto projected_column_to_update = makeExpr<Analyzer::ColumnVar>(
2112  cd->columnType,
2113  shared::ColumnKey{catalog->getDatabaseId(), td->tableId, cd->columnId},
2114  0);
2115  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
2116  work_unit.exe_unit, projected_column_to_update);
2117  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
2118  throw std::runtime_error(
2119  "Variable length updates not yet supported on temporary tables.");
2120  }
2121  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
2122  } else {
2123  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
2124  }
2125  };
2126 
2127  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
2128  auto work_unit = createCompoundWorkUnit(compound, SortInfo(), eo_in);
2129 
2130  execute_update_for_node(compound, work_unit, compound->isAggregate());
2131  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
2132  auto work_unit = createProjectWorkUnit(project, SortInfo(), eo_in);
2133 
2134  if (project->isSimple()) {
2135  CHECK_EQ(size_t(1), project->inputCount());
2136  const auto input_ra = project->getInput(0);
2137  if (dynamic_cast<const RelSort*>(input_ra)) {
2138  const auto& input_table =
2139  get_temporary_table(&temporary_tables_, -input_ra->getId());
2140  CHECK(input_table);
2141  work_unit.exe_unit.scan_limit = input_table->rowCount();
2142  }
2143  }
2144  if (project->hasWindowFunctionExpr() || project->hasPushedDownWindowExpr()) {
2145  // the first condition means this project node has at least one window function
2146  // and the second condition indicates that this project node falls into
2147  // one of the following cases:
2148  // 1) window function expression on a multi-fragmented table
2149  // 2) window function expression is too complex to evaluate without codegen:
2150  // i.e., sum(x+y+z) instead of sum(x) -> we currently do not support codegen to
2151  // evaluate such a complex window function expression
2152  // 3) nested window function expression
2153  // but currently we do not support update on a multi-fragmented table having
2154  // window function, so the second condition only refers to non-fragmented table with
2155  // cases 2) or 3)
2156  // if at least one of two conditions satisfy, we must compute corresponding window
2157  // context before entering `execute_update_for_node` to properly update the table
2158  if (!leaf_results_.empty()) {
2159  throw std::runtime_error(
2160  "Update query having window function is not yet supported in distributed "
2161  "mode.");
2162  }
2163  ColumnCacheMap column_cache;
2165  computeWindow(work_unit, co, eo_in, column_cache, queue_time_ms);
2166  }
2167  execute_update_for_node(project, work_unit, false);
2168  } else {
2169  throw std::runtime_error("Unsupported parent node for update: " +
2170  node->toString(RelRexToStringConfig::defaults()));
2171  }
2172 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::optional< std::function< void()> > post_execution_callback_
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
TemporaryTables temporary_tables_
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
std::unordered_map< unsigned, AggregatedResult > leaf_results_
void computeWindow(const WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:246
StorageIOFacility::UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
ExecutorDeviceType device_type
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:78
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:412
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:438
static std::string getErrorMessageFromCode(const int32_t error_code)
std::unique_ptr< TransactionParameters > dml_transaction_parameters_
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeWorkUnit ( const WorkUnit work_unit,
const std::vector< TargetMetaInfo > &  targets_meta,
const bool  is_agg,
const CompilationOptions co_in,
const ExecutionOptions eo_in,
RenderInfo render_info,
const int64_t  queue_time_ms,
const std::optional< size_t >  previous_count = std::nullopt 
)
private

Definition at line 3647 of file RelAlgExecutor.cpp.

References RelAlgExecutor::WorkUnit::body, build_render_targets(), anonymous_namespace{RelAlgExecutor.cpp}::can_output_columnar(), anonymous_namespace{RelAlgExecutor.cpp}::can_use_bump_allocator(), canUseResultsetCache(), CHECK, CHECK_EQ, CHECK_GT, anonymous_namespace{RelAlgExecutor.cpp}::check_none_encoded_string_cast_tuple_limit(), anonymous_namespace{RelAlgExecutor.cpp}::compute_output_buffer_size(), computeWindow(), CPU, DEBUG_TIMER, anonymous_namespace{RelAlgExecutor.cpp}::decide_approx_count_distinct_implementation(), RelRexToStringConfig::defaults(), RegisteredQueryHint::defaults(), RelAlgExecutor::WorkUnit::exe_unit, anonymous_namespace{RelAlgExecutor.cpp}::exe_unit_has_quals(), executor_, Extern, g_allow_auto_resultset_caching, g_auto_resultset_caching_threshold, g_big_group_threshold, g_cluster, g_columnar_large_projections_threshold, g_enable_window_functions, g_estimator_failure_max_groupby_size, Catalog_Namespace::get_metadata_for_table(), get_table_infos(), QueryExecutionError::getErrorCode(), getFilteredCountAll(), getNDVEstimation(), ScanNodeTableKeyCollector::getScanNodeTableKey(), GPU, anonymous_namespace{RelAlgExecutor.cpp}::groups_approx_upper_bound(), handleOutOfMemoryRetry(), handlePersistentError(), hasStepForUnion(), INJECT_TIMER, RelAlgExecutionUnit::input_descs, anonymous_namespace{RelAlgExecutor.cpp}::is_agg(), is_validate_or_explain_query(), anonymous_namespace{RelAlgExecutor.cpp}::is_window_execution_unit(), heavyai::InSituFlagsOwnerInterface::isInSitu(), isRowidLookup(), kCudaBlockSize, kCudaGridSize, leaf_results_, RelAlgExecutor::WorkUnit::max_groups_buffer_entry_guess, query_dag_, CardinalityEstimationRequired::range(), run_benchmark_import::res, WindowProjectNodeContext::reset(), run_benchmark_import::result, selectFiltersToBePushedDown(), anonymous_namespace{RelAlgExecutor.cpp}::should_output_columnar(), RelAlgExecutionUnit::target_exprs, target_exprs_owned_, timer_start(), timer_stop(), use_speculative_top_n(), VLOG, and QueryExecutionError::wasMultifragKernelLaunch().

Referenced by executeAggregate(), executeCompound(), executeFilter(), executeProject(), executeSort(), and executeUnion().

3655  {
3657  auto timer = DEBUG_TIMER(__func__);
3658  auto query_exec_time_begin = timer_start();
3659 
3660  const auto query_infos = get_table_infos(work_unit.exe_unit.input_descs, executor_);
3661  check_none_encoded_string_cast_tuple_limit(query_infos, work_unit.exe_unit);
3662 
3663  auto co = co_in;
3664  auto eo = eo_in;
3665  ColumnCacheMap column_cache;
3666  ScopeGuard clearWindowContextIfNecessary = [&]() {
3667  if (is_window_execution_unit(work_unit.exe_unit)) {
3669  }
3670  };
3671  if (is_window_execution_unit(work_unit.exe_unit)) {
3673  throw std::runtime_error("Window functions support is disabled");
3674  }
3675  co.device_type = ExecutorDeviceType::CPU;
3676  co.allow_lazy_fetch = false;
3677  computeWindow(work_unit, co, eo, column_cache, queue_time_ms);
3678  }
3679  if (!eo.just_explain && eo.find_push_down_candidates) {
3680  // find potential candidates:
3681  VLOG(1) << "Try to find filter predicate push-down candidate.";
3682  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
3683  if (!selected_filters.empty() || eo.just_calcite_explain) {
3684  VLOG(1) << "Found " << selected_filters.size()
3685  << " filter(s) to be pushed down. Re-create a query plan based on pushed "
3686  "filter predicate(s).";
3687  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
3688  }
3689  VLOG(1) << "Continue with the current query plan";
3690  }
3691  if (render_info && render_info->isInSitu()) {
3692  co.allow_lazy_fetch = false;
3693  }
3694  const auto body = work_unit.body;
3695  CHECK(body);
3696  auto it = leaf_results_.find(body->getId());
3697  VLOG(3) << "body->getId()=" << body->getId()
3698  << " body->toString()=" << body->toString(RelRexToStringConfig::defaults())
3699  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
3700  if (it != leaf_results_.end()) {
3701  executor_->addTransientStringLiterals(work_unit.exe_unit,
3702  executor_->row_set_mem_owner_);
3703  auto& aggregated_result = it->second;
3704  auto& result_rows = aggregated_result.rs;
3705  ExecutionResult result(result_rows, aggregated_result.targets_meta);
3706  body->setOutputMetainfo(aggregated_result.targets_meta);
3707  if (render_info) {
3708  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3709  }
3710  return result;
3711  }
3712  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
3713 
3715  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
3716 
3717  // register query hint if query_dag_ is valid
3718  ra_exe_unit.query_hint = RegisteredQueryHint::defaults();
3719  if (query_dag_) {
3720  auto candidate = query_dag_->getQueryHint(body);
3721  if (candidate) {
3722  ra_exe_unit.query_hint = *candidate;
3723  }
3724  }
3725 
3726  const auto& query_hints = ra_exe_unit.query_hint;
3727  ScopeGuard reset_cuda_block_grid_sizes = [&,
3728  orig_block_size = executor_->blockSize(),
3729  orig_grid_size = executor_->gridSize()]() {
3730  if (executor_->getDataMgr()->getCudaMgr()) {
3731  if (query_hints.isHintRegistered(QueryHint::kCudaBlockSize)) {
3732  if (orig_block_size) {
3733  executor_->setBlockSize(orig_block_size);
3734  } else {
3735  executor_->resetBlockSize();
3736  }
3737  }
3738  if (query_hints.isHintRegistered(QueryHint::kCudaGridSize)) {
3739  if (orig_grid_size) {
3740  executor_->setGridSize(orig_grid_size);
3741  } else {
3742  executor_->resetGridSize();
3743  }
3744  }
3745  }
3746  };
3747 
3748  if (co.device_type == ExecutorDeviceType::GPU) {
3749  if (query_hints.isHintRegistered(QueryHint::kCudaGridSize)) {
3750  if (!executor_->getDataMgr()->getCudaMgr()) {
3751  VLOG(1) << "Skip CUDA grid size query hint: cannot detect CUDA device";
3752  } else {
3753  const auto num_sms = executor_->cudaMgr()->getMinNumMPsForAllDevices();
3754  const auto new_grid_size = static_cast<unsigned>(
3755  std::max(1.0, std::round(num_sms * query_hints.cuda_grid_size_multiplier)));
3756  const auto default_grid_size = executor_->gridSize();
3757  if (new_grid_size != default_grid_size) {
3758  VLOG(1) << "Change CUDA grid size: " << default_grid_size
3759  << " (default_grid_size) -> " << new_grid_size << " (# SMs * "
3760  << query_hints.cuda_grid_size_multiplier << ")";
3761  // todo (yoonmin): do we need to check a hard limit?
3762  executor_->setGridSize(new_grid_size);
3763  } else {
3764  VLOG(1) << "Skip CUDA grid size query hint: invalid grid size";
3765  }
3766  }
3767  }
3768  if (query_hints.isHintRegistered(QueryHint::kCudaBlockSize)) {
3769  if (!executor_->getDataMgr()->getCudaMgr()) {
3770  VLOG(1) << "Skip CUDA block size query hint: cannot detect CUDA device";
3771  } else {
3772  int cuda_block_size = query_hints.cuda_block_size;
3773  int warp_size = executor_->warpSize();
3774  if (cuda_block_size >= warp_size) {
3775  cuda_block_size = (cuda_block_size + warp_size - 1) / warp_size * warp_size;
3776  VLOG(1) << "Change CUDA block size w.r.t warp size (" << warp_size
3777  << "): " << executor_->blockSize() << " -> " << cuda_block_size;
3778  } else {
3779  VLOG(1) << "Change CUDA block size: " << executor_->blockSize() << " -> "
3780  << cuda_block_size;
3781  }
3782  executor_->setBlockSize(cuda_block_size);
3783  }
3784  }
3785  }
3786 
3787  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
3788  if (is_window_execution_unit(ra_exe_unit)) {
3789  CHECK_EQ(table_infos.size(), size_t(1));
3790  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
3791  max_groups_buffer_entry_guess =
3792  table_infos.front().info.fragments.front().getNumTuples();
3793  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
3794  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
3795  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
3796  ra_exe_unit.scan_limit = *previous_count;
3797  } else {
3798  // TODO(adb): enable bump allocator path for render queries
3799  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
3800  ra_exe_unit.scan_limit = 0;
3801  ra_exe_unit.use_bump_allocator = true;
3802  } else if (eo.executor_type == ::ExecutorType::Extern) {
3803  ra_exe_unit.scan_limit = 0;
3804  } else if (!eo.just_explain) {
3805  const auto filter_count_all = getFilteredCountAll(ra_exe_unit, true, co, eo);
3806  if (filter_count_all) {
3807  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
3808  VLOG(1) << "Set a new scan limit from filtered_count_all: "
3809  << ra_exe_unit.scan_limit;
3810  auto const has_limit_value = ra_exe_unit.sort_info.limit.has_value();
3811  auto const top_k_sort_query =
3812  has_limit_value && !ra_exe_unit.sort_info.order_entries.empty();
3813  // top-k sort query needs to get a global result before sorting, so we cannot
3814  // apply LIMIT value at this point
3815  if (has_limit_value && !top_k_sort_query &&
3816  ra_exe_unit.scan_limit > ra_exe_unit.sort_info.limit.value()) {
3817  ra_exe_unit.scan_limit = ra_exe_unit.sort_info.limit.value();
3818  VLOG(1) << "Override scan limit to LIMIT value: " << ra_exe_unit.scan_limit;
3819  }
3820  }
3821  }
3822  }
3823  }
3824 
3825  // when output_columnar_hint is true here, it means either 1) columnar output
3826  // configuration is on or 2) a user hint is given but we have to disable it if some
3827  // requirements are not satisfied
3828  if (can_output_columnar(ra_exe_unit, render_info, body)) {
3829  if (!eo.output_columnar_hint && should_output_columnar(ra_exe_unit)) {
3830  VLOG(1) << "Using columnar layout for projection as output size of "
3831  << ra_exe_unit.scan_limit << " rows exceeds threshold of "
3833  << " or some target uses FlatBuffer memory layout.";
3834  eo.output_columnar_hint = true;
3835  }
3836  } else {
3837  eo.output_columnar_hint = false;
3838  }
3839 
3840  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3841  co.device_type,
3843  nullptr,
3844  executor_->blockSize(),
3845  executor_->gridSize()),
3846  {}};
3847 
3848  auto execute_and_handle_errors = [&](const auto max_groups_buffer_entry_guess_in,
3849  const bool has_cardinality_estimation,
3850  const bool has_ndv_estimation) -> ExecutionResult {
3851  // Note that the groups buffer entry guess may be modified during query execution.
3852  // Create a local copy so we can track those changes if we need to attempt a retry
3853  // due to OOM
3854  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
3855  try {
3856  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
3857  is_agg,
3858  table_infos,
3859  ra_exe_unit,
3860  co,
3861  eo,
3862  render_info,
3863  has_cardinality_estimation,
3864  column_cache),
3865  targets_meta};
3866  } catch (const QueryExecutionError& e) {
3867  if (!has_ndv_estimation && e.getErrorCode() < 0) {
3868  throw CardinalityEstimationRequired(/*range=*/0);
3869  }
3871  return handleOutOfMemoryRetry(
3872  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
3873  targets_meta,
3874  is_agg,
3875  co,
3876  eo,
3877  render_info,
3879  queue_time_ms);
3880  }
3881  };
3882 
3883  auto use_resultset_cache = canUseResultsetCache(eo, render_info);
3884  for (const auto& table_info : table_infos) {
3885  const auto db_id = table_info.table_key.db_id;
3886  if (db_id > 0) {
3887  const auto td = Catalog_Namespace::get_metadata_for_table(table_info.table_key);
3888  if (td && (td->isTemporaryTable() || td->isView)) {
3889  use_resultset_cache = false;
3890  if (eo.keep_result) {
3891  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has either "
3892  "temporary table or view";
3893  }
3894  }
3895  }
3896  }
3897 
3898  CardinalityCacheKey cache_key{ra_exe_unit};
3899  try {
3900  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3901  auto card = cached_cardinality.second;
3902  if (cached_cardinality.first && card >= 0) {
3903  VLOG(1) << "Use cached cardinality for max_groups_buffer_entry_guess: " << card;
3904  result = execute_and_handle_errors(
3905  card, /*has_cardinality_estimation=*/true, /*has_ndv_estimation=*/false);
3906  } else {
3907  VLOG(1) << "Use default cardinality for max_groups_buffer_entry_guess: "
3908  << max_groups_buffer_entry_guess;
3909  result = execute_and_handle_errors(
3910  max_groups_buffer_entry_guess,
3911  groups_approx_upper_bound(table_infos).first <= g_big_group_threshold,
3912  /*has_ndv_estimation=*/false);
3913  }
3914  } catch (const CardinalityEstimationRequired& e) {
3915  // check the cardinality cache
3916  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3917  auto card = cached_cardinality.second;
3918  if (cached_cardinality.first && card >= 0) {
3919  VLOG(1) << "CardinalityEstimationRequired, Use cached cardinality for "
3920  "max_groups_buffer_entry_guess: "
3921  << card;
3922  result = execute_and_handle_errors(card, true, /*has_ndv_estimation=*/true);
3923  } else {
3924  const auto ndv_groups_estimation =
3925  getNDVEstimation(work_unit, e.range(), is_agg, co, eo);
3926  const auto estimated_groups_buffer_entry_guess =
3927  ndv_groups_estimation > 0
3928  ? 2 * ndv_groups_estimation
3929  : std::min(groups_approx_upper_bound(table_infos).first,
3931  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
3932  VLOG(1) << "CardinalityEstimationRequired, Use ndv_estimation: "
3933  << ndv_groups_estimation
3934  << ", cardinality for estimated_groups_buffer_entry_guess: "
3935  << estimated_groups_buffer_entry_guess;
3936  result = execute_and_handle_errors(
3937  estimated_groups_buffer_entry_guess, true, /*has_ndv_estimation=*/true);
3938  if (!(eo.just_validate || eo.just_explain)) {
3939  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3940  }
3941  }
3942  }
3943 
3944  result.setQueueTime(queue_time_ms);
3945  if (render_info) {
3946  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3947  if (render_info->isInSitu()) {
3948  // return an empty result (with the same queue time, and zero render time)
3949  return {std::make_shared<ResultSet>(
3950  queue_time_ms,
3951  0,
3952  executor_->row_set_mem_owner_
3953  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3954  : nullptr),
3955  {}};
3956  }
3957  }
3958 
3959  for (auto& target_info : result.getTargetsMeta()) {
3960  if (target_info.get_type_info().is_string() &&
3961  !target_info.get_type_info().is_dict_encoded_string()) {
3962  // currently, we do not support resultset caching if non-encoded string is projected
3963  use_resultset_cache = false;
3964  if (eo.keep_result) {
3965  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has non-encoded "
3966  "string column projection";
3967  }
3968  }
3969  }
3970 
3971  const auto res = result.getDataPtr();
3972  auto allow_auto_caching_resultset =
3973  res && res->hasValidBuffer() && g_allow_auto_resultset_caching &&
3974  res->getBufferSizeBytes(co.device_type) <= g_auto_resultset_caching_threshold;
3975  if (use_resultset_cache && (eo.keep_result || allow_auto_caching_resultset)) {
3976  auto query_exec_time = timer_stop(query_exec_time_begin);
3977  res->setExecTime(query_exec_time);
3978  res->setQueryPlanHash(ra_exe_unit.query_plan_dag_hash);
3979  res->setTargetMetaInfo(body->getOutputMetainfo());
3980  auto input_table_keys = ScanNodeTableKeyCollector::getScanNodeTableKey(body);
3981  res->setInputTableKeys(std::move(input_table_keys));
3982  if (allow_auto_caching_resultset) {
3983  VLOG(1) << "Automatically keep query resultset to recycler";
3984  }
3985  res->setUseSpeculativeTopNSort(
3986  use_speculative_top_n(ra_exe_unit, res->getQueryMemDesc()));
3987  executor_->getResultSetRecyclerHolder().putQueryResultSetToCache(
3988  ra_exe_unit.query_plan_dag_hash,
3989  res->getInputTableKeys(),
3990  res,
3991  res->getBufferSizeBytes(co.device_type),
3993  } else {
3994  if (eo.keep_result) {
3995  if (g_cluster) {
3996  VLOG(1) << "Query hint \'keep_result\' is ignored since we do not support "
3997  "resultset recycling on distributed mode";
3998  } else if (hasStepForUnion()) {
3999  VLOG(1) << "Query hint \'keep_result\' is ignored since a query has union-(all) "
4000  "operator";
4001  } else if (render_info && render_info->isInSitu()) {
4002  VLOG(1) << "Query hint \'keep_result\' is ignored since a query is classified as "
4003  "a in-situ rendering query";
4004  } else if (is_validate_or_explain_query(eo)) {
4005  VLOG(1) << "Query hint \'keep_result\' is ignored since a query is either "
4006  "validate or explain query";
4007  } else {
4008  VLOG(1) << "Query hint \'keep_result\' is ignored";
4009  }
4010  }
4011  }
4012 
4013  return result;
4014 }
bool is_agg(const Analyzer::Expr *expr)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
bool is_validate_or_explain_query(const ExecutionOptions &eo)
std::pair< size_t, shared::TableKey > groups_approx_upper_bound(const std::vector< InputTableInfo > &table_infos)
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
const TableDescriptor * get_metadata_for_table(const ::shared::TableKey &table_key, bool populate_fragmenter)
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
std::unordered_map< unsigned, AggregatedResult > leaf_results_
#define CHECK_GT(x, y)
Definition: Logger.h:305
bool is_window_execution_unit(const RelAlgExecutionUnit &ra_exe_unit)
static std::unordered_set< size_t > getScanNodeTableKey(RelAlgNode const *rel_alg_node)
bool hasStepForUnion() const
void computeWindow(const WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
bool can_output_columnar(const RelAlgExecutionUnit &ra_exe_unit, const RenderInfo *render_info, const RelAlgNode *body)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
void check_none_encoded_string_cast_tuple_limit(const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit)
#define INJECT_TIMER(DESC)
Definition: measure.h:96
static void reset(Executor *executor)
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
size_t g_big_group_threshold
Definition: Execute.cpp:115
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
std::optional< size_t > getFilteredCountAll(const RelAlgExecutionUnit &ra_exe_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
size_t getNDVEstimation(const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
bool g_allow_auto_resultset_caching
Definition: Execute.cpp:158
bool g_enable_window_functions
Definition: Execute.cpp:116
std::unique_ptr< RelAlgDag > query_dag_
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
static RegisteredQueryHint defaults()
Definition: QueryHint.h:364
size_t g_estimator_failure_max_groupby_size
static RelRexToStringConfig defaults()
Definition: RelAlgDag.h:78
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
bool canUseResultsetCache(const ExecutionOptions &eo, RenderInfo *render_info) const
bool isRowidLookup(const WorkUnit &work_unit)
bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit)
static void handlePersistentError(const int32_t error_code)
bool compute_output_buffer_size(const RelAlgExecutionUnit &ra_exe_unit)
bool should_output_columnar(const RelAlgExecutionUnit &ra_exe_unit)
bool wasMultifragKernelLaunch() const
Definition: ErrorHandling.h:57
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:412
bool g_cluster
RelAlgExecutionUnit decide_approx_count_distinct_implementation(const RelAlgExecutionUnit &ra_exe_unit_in, const std::vector< InputTableInfo > &table_infos, const Executor *executor, const ExecutorDeviceType device_type_in, std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned)
size_t g_columnar_large_projections_threshold
Executor * executor_
#define VLOG(n)
Definition: Logger.h:388
Type timer_start()
Definition: measure.h:42
size_t g_auto_resultset_caching_threshold
Definition: Execute.cpp:164

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string RelAlgExecutor::getErrorMessageFromCode ( const int32_t  error_code)
static

Definition at line 4336 of file RelAlgExecutor.cpp.

References anonymous_namespace{RelAlgExecutor.cpp}::getErrorDescription(), and to_string().

Referenced by executeDelete(), executeUpdate(), getNDVEstimation(), and handlePersistentError().

4336  {
4337  if (error_code < 0) {
4338  return "Ran out of slots in the query output buffer";
4339  }
4340  const auto errorInfo = getErrorDescription(error_code);
4341 
4342  if (errorInfo.code) {
4343  return errorInfo.code + ": "s + errorInfo.description;
4344  } else {
4345  return "Other error: code "s + std::to_string(error_code);
4346  }
4347 }
ErrorInfo getErrorDescription(const int32_t error_code)
std::string to_string(char const *&&v)
def error_code
Definition: report.py:244

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Executor * RelAlgExecutor::getExecutor ( ) const

Definition at line 771 of file RelAlgExecutor.cpp.

References executor_.

771  {
772  return executor_;
773 }
Executor * executor_
std::optional< size_t > RelAlgExecutor::getFilteredCountAll ( const RelAlgExecutionUnit ra_exe_unit,
const bool  is_agg,
const CompilationOptions co,
const ExecutionOptions eo 
)
private

Definition at line 4049 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, CHECK_GE, RelAlgExecutionUnit::createCountAllExecutionUnit(), ExecutionOptions::estimate_output_cardinality, executor_, g_bigint_count, get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_table_name_from_table_key(), anonymous_namespace{RelAlgExecutor.cpp}::groups_approx_upper_bound(), hasDeletedRowInQuery(), anonymous_namespace{RelAlgExecutor.cpp}::is_projection(), RelAlgExecutionUnit::join_quals, kBIGINT, kCOUNT, kINT, LOG, RelAlgExecutionUnit::per_device_cardinality, RelAlgExecutionUnit::quals, RelAlgExecutionUnit::simple_quals, VLOG, and logger::WARNING.

Referenced by executeWorkUnit().

4053  {
4054  auto const input_tables_info = get_table_infos(ra_exe_unit, executor_);
4055  if (is_projection(ra_exe_unit) && ra_exe_unit.simple_quals.empty() &&
4056  ra_exe_unit.quals.empty() && ra_exe_unit.join_quals.empty() &&
4057  !hasDeletedRowInQuery(input_tables_info)) {
4058  auto const max_row_info = groups_approx_upper_bound(input_tables_info);
4059  auto const num_rows = max_row_info.first;
4060  auto const table_name = get_table_name_from_table_key(max_row_info.second);
4061  VLOG(1) << "Short-circuiting filtered count query for the projection query "
4062  "containing input table "
4063  << table_name << ": return its table cardinality " << num_rows << " instead";
4064  return std::make_optional<size_t>(num_rows);
4065  }
4066  const auto count =
4067  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
4068  kCOUNT,
4069  nullptr,
4070  false,
4071  nullptr);
4072  const auto count_all_exe_unit = ra_exe_unit.createCountAllExecutionUnit(count.get());
4073  size_t one{1};
4074  ResultSetPtr count_all_result;
4075  try {
4076  VLOG(1) << "Try to execute pre-flight counts query";
4077  ColumnCacheMap column_cache;
4078  ExecutionOptions copied_eo = eo;
4079  copied_eo.estimate_output_cardinality = true;
4080  count_all_result = executor_->executeWorkUnit(one,
4081  is_agg,
4082  input_tables_info,
4083  count_all_exe_unit,
4084  co,
4085  copied_eo,
4086  nullptr,
4087  false,
4088  column_cache);
4089  ra_exe_unit.per_device_cardinality = count_all_exe_unit.per_device_cardinality;
4090  } catch (const foreign_storage::ForeignStorageException& error) {
4091  throw;
4092  } catch (const QueryMustRunOnCpu&) {
4093  // force a retry of the top level query on CPU
4094  throw;
4095  } catch (const std::exception& e) {
4096  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
4097  return std::nullopt;
4098  }
4099  const auto count_row = count_all_result->getNextRow(false, false);
4100  CHECK_EQ(size_t(1), count_row.size());
4101  const auto& count_tv = count_row.front();
4102  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
4103  CHECK(count_scalar_tv);
4104  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
4105  CHECK(count_ptr);
4106  CHECK_GE(*count_ptr, 0);
4107  auto count_upper_bound = static_cast<size_t>(*count_ptr);
4108  return std::max(count_upper_bound, size_t(1));
4109 }
bool is_agg(const Analyzer::Expr *expr)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::pair< size_t, shared::TableKey > groups_approx_upper_bound(const std::vector< InputTableInfo > &table_infos)
#define LOG(tag)
Definition: Logger.h:285
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::shared_ptr< ResultSet > ResultSetPtr
std::vector< std::pair< std::vector< size_t >, size_t > > per_device_cardinality
const JoinQualsPerNestingLevel join_quals
bool g_bigint_count
RelAlgExecutionUnit createCountAllExecutionUnit(Analyzer::Expr *replacement_target) const
bool is_projection(const RelAlgExecutionUnit &ra_exe_unit)
Definition: sqldefs.h:78
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
bool hasDeletedRowInQuery(std::vector< InputTableInfo > const &) const
std::list< std::shared_ptr< Analyzer::Expr > > quals
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqltypes.h:72
Executor * executor_
#define VLOG(n)
Definition: Logger.h:388
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
std::string get_table_name_from_table_key(shared::TableKey const &table_key)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

FilterSelectivity RelAlgExecutor::getFilterSelectivity ( const std::vector< std::shared_ptr< Analyzer::Expr >> &  filter_expressions,
const CompilationOptions co,
const ExecutionOptions eo 
)
private

Given a set of filter expressions for a table, it launches a new COUNT query to compute the number of passing rows, and then generates a set of statistics related to those filters. Later, these stats are used to decide whether a filter should be pushed down or not.

Definition at line 59 of file JoinFilterPushDown.cpp.

References CHECK, CHECK_EQ, executor_, g_bigint_count, get_table_infos(), kBIGINT, kCOUNT, and kINT.

Referenced by selectFiltersToBePushedDown().

62  {
63  CollectInputColumnsVisitor input_columns_visitor;
64  std::list<std::shared_ptr<Analyzer::Expr>> quals;
65  std::unordered_set<InputColDescriptor> input_column_descriptors;
66  BindFilterToOutermostVisitor bind_filter_to_outermost;
67  for (const auto& filter_expr : filter_expressions) {
68  input_column_descriptors = input_columns_visitor.aggregateResult(
69  input_column_descriptors, input_columns_visitor.visit(filter_expr.get()));
70  quals.push_back(bind_filter_to_outermost.visit(filter_expr.get()));
71  }
72  std::vector<InputDescriptor> input_descs;
73  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
74  for (const auto& input_col_desc : input_column_descriptors) {
75  if (input_descs.empty()) {
76  input_descs.push_back(input_col_desc.getScanDesc());
77  } else {
78  CHECK(input_col_desc.getScanDesc() == input_descs.front());
79  }
80  input_col_descs.push_back(std::make_shared<const InputColDescriptor>(input_col_desc));
81  }
82  const auto count_expr =
83  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
84  kCOUNT,
85  nullptr,
86  false,
87  nullptr);
88  RelAlgExecutionUnit ra_exe_unit{input_descs,
89  input_col_descs,
90  {},
91  quals,
92  {},
93  {},
94  {count_expr.get()},
95  {},
96  nullptr,
97  SortInfo(),
98  0};
99  size_t one{1};
100  ResultSetPtr filtered_result;
101  const auto table_infos = get_table_infos(input_descs, executor_);
102  CHECK_EQ(size_t(1), table_infos.size());
103  const size_t total_rows_upper_bound = table_infos.front().info.getNumTuplesUpperBound();
104  try {
105  ColumnCacheMap column_cache;
106  filtered_result = executor_->executeWorkUnit(
107  one, true, table_infos, ra_exe_unit, co, eo, nullptr, false, column_cache);
108  } catch (...) {
109  return {false, 1.0, 0};
110  }
111  const auto count_row = filtered_result->getNextRow(false, false);
112  CHECK_EQ(size_t(1), count_row.size());
113  const auto& count_tv = count_row.front();
114  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
115  CHECK(count_scalar_tv);
116  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
117  CHECK(count_ptr);
118  const auto rows_passing = *count_ptr;
119  const auto rows_total = std::max(total_rows_upper_bound, size_t(1));
120  return {true, static_cast<float>(rows_passing) / rows_total, total_rows_upper_bound};
121 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::shared_ptr< ResultSet > ResultSetPtr
bool g_bigint_count
Definition: sqldefs.h:78
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqltypes.h:72
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::optional<RegisteredQueryHint> RelAlgExecutor::getGlobalQueryHint ( )
inline

Definition at line 178 of file RelAlgExecutor.h.

References query_dag_.

Referenced by executeRelAlgQueryNoRetry(), executeTableFunction(), and QueryRunner::QueryRunner::getParsedGlobalQueryHints().

178  {
179  return query_dag_ ? std::make_optional(query_dag_->getGlobalHints()) : std::nullopt;
180  }
std::unique_ptr< RelAlgDag > query_dag_

+ Here is the caller graph for this function:

std::pair< std::vector< unsigned >, std::unordered_map< unsigned, JoinQualsPerNestingLevel > > RelAlgExecutor::getJoinInfo ( const RelAlgNode root_node)

Definition at line 781 of file RelAlgExecutor.cpp.

References createWorkUnit(), ExecutionOptions::defaults(), getLeftDeepJoinTreesInfo(), and RelAlgVisitor< T >::visit().

781  {
782  auto sort_node = dynamic_cast<const RelSort*>(root_node);
783  if (sort_node) {
784  // we assume that test query that needs join info does not contain any sort node
785  return {};
786  }
787  auto work_unit = createWorkUnit(root_node, {}, ExecutionOptions::defaults());
789  auto left_deep_tree_ids = visitor.visit(root_node);
790  return {left_deep_tree_ids, getLeftDeepJoinTreesInfo()};
791 }
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
T visit(const RelAlgNode *rel_alg) const
Definition: RelAlgVisitor.h:25
static ExecutionOptions defaults()

+ Here is the call graph for this function:

std::unordered_map<unsigned, JoinQualsPerNestingLevel>& RelAlgExecutor::getLeftDeepJoinTreesInfo ( )
inlineprivate

Definition at line 411 of file RelAlgExecutor.h.

References left_deep_join_info_.

Referenced by createAggregateWorkUnit(), createCompoundWorkUnit(), createFilterWorkUnit(), createProjectWorkUnit(), and getJoinInfo().

411  {
412  return left_deep_join_info_;
413  }
std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_join_info_

+ Here is the caller graph for this function:

size_t RelAlgExecutor::getNDVEstimation ( const WorkUnit work_unit,
const int64_t  range,
const bool  is_agg,
const CompilationOptions co,
const ExecutionOptions eo 
)
private

Definition at line 54 of file CardinalityEstimator.cpp.

References RelAlgExecutionUnit::createNdvExecutionUnit(), Executor::ERR_INTERRUPTED, Executor::ERR_OUT_OF_TIME, RelAlgExecutor::WorkUnit::exe_unit, executor_, get_table_infos(), QueryExecutionError::getErrorCode(), getErrorMessageFromCode(), and UNREACHABLE.

Referenced by executeWorkUnit().

58  {
59  const auto estimator_exe_unit = work_unit.exe_unit.createNdvExecutionUnit(range);
60  size_t one{1};
61  ColumnCacheMap column_cache;
62  try {
63  const auto estimator_result =
64  executor_->executeWorkUnit(one,
65  is_agg,
66  get_table_infos(work_unit.exe_unit, executor_),
67  estimator_exe_unit,
68  co,
69  eo,
70  nullptr,
71  false,
72  column_cache);
73  if (!estimator_result) {
74  return 1; // empty row set, only needs one slot
75  }
76  return estimator_result->getNDVEstimator();
77  } catch (const QueryExecutionError& e) {
79  throw std::runtime_error("Cardinality estimation query ran out of time");
80  }
82  throw std::runtime_error("Cardinality estimation query has been interrupted");
83  }
84  throw std::runtime_error("Failed to run the cardinality estimation query: " +
86  }
87  UNREACHABLE();
88  return 0;
89 }
bool is_agg(const Analyzer::Expr *expr)
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1623
#define UNREACHABLE()
Definition: Logger.h:338
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1622
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
static std::string getErrorMessageFromCode(const int32_t error_code)
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t RelAlgExecutor::getOuterFragmentCount ( const CompilationOptions co,
const ExecutionOptions eo 
)

Definition at line 492 of file RelAlgExecutor.cpp.

References CHECK, cleanupPostExecution(), createCompoundWorkUnit(), createProjectWorkUnit(), executor_, ExecutionOptions::find_push_down_candidates, get_frag_count_of_table(), getSubqueries(), ExecutionOptions::just_explain, query_dag_, setupCaching(), gpu_enabled::swap(), target_exprs_owned_, and temporary_tables_.

493  {
494  if (eo.find_push_down_candidates) {
495  return 0;
496  }
497 
498  if (eo.just_explain) {
499  return 0;
500  }
501 
502  CHECK(query_dag_);
503 
504  query_dag_->resetQueryExecutionState();
505  const auto& ra = query_dag_->getRootNode();
506 
507  auto lock = executor_->acquireExecuteMutex();
508  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
509  setupCaching(&ra);
510 
511  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
512  auto ed_seq = RaExecutionSequence(&ra, executor_);
513 
514  if (!getSubqueries().empty()) {
515  return 0;
516  }
517 
518  CHECK(!ed_seq.empty());
519  if (ed_seq.size() > 1) {
520  return 0;
521  }
522 
525  executor_->temporary_tables_ = &temporary_tables_;
526 
527  auto exec_desc_ptr = ed_seq.getDescriptor(0);
528  CHECK(exec_desc_ptr);
529  auto& exec_desc = *exec_desc_ptr;
530  const auto body = exec_desc.getBody();
531  if (body->isNop()) {
532  return 0;
533  }
534 
535  const auto project = dynamic_cast<const RelProject*>(body);
536  if (project) {
537  auto work_unit = createProjectWorkUnit(project, SortInfo(), eo);
538  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableKey(),
539  executor_);
540  }
541 
542  const auto compound = dynamic_cast<const RelCompound*>(body);
543  if (compound) {
544  if (compound->isDeleteViaSelect()) {
545  return 0;
546  } else if (compound->isUpdateViaSelect()) {
547  return 0;
548  } else {
549  if (compound->isAggregate()) {
550  return 0;
551  }
552 
553  const auto work_unit = createCompoundWorkUnit(compound, SortInfo(), eo);
554 
555  return get_frag_count_of_table(work_unit.exe_unit.input_descs[0].getTableKey(),
556  executor_);
557  }
558  }
559 
560  return 0;
561 }
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
TemporaryTables temporary_tables_
void setupCaching(const RelAlgNode *ra)
size_t get_frag_count_of_table(const shared::TableKey &table_key, Executor *executor)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
A container for relational algebra descriptors defining the execution order for a relational algebra ...
std::unique_ptr< RelAlgDag > query_dag_
#define CHECK(condition)
Definition: Logger.h:291
void cleanupPostExecution()
Executor * executor_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114

+ Here is the call graph for this function:

std::unique_ptr<RelAlgDag> RelAlgExecutor::getOwnedRelAlgDag ( )
inline

Definition at line 134 of file RelAlgExecutor.h.

References CHECK, and query_dag_.

Referenced by QueryRunner::QueryRunner::getRelAlgDag().

134  {
135  CHECK(query_dag_);
136  return std::move(query_dag_);
137  }
std::unique_ptr< RelAlgDag > query_dag_
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the caller graph for this function:

std::optional<RegisteredQueryHint> RelAlgExecutor::getParsedQueryHint ( const RelAlgNode node)
inline

Definition at line 168 of file RelAlgExecutor.h.

References query_dag_.

Referenced by executeRelAlgQueryNoRetry(), executeRelAlgStep(), and QueryRunner::QueryRunner::getParsedQueryHint().

168  {
169  return query_dag_ ? query_dag_->getQueryHint(node) : std::nullopt;
170  }
std::unique_ptr< RelAlgDag > query_dag_

+ Here is the caller graph for this function:

std::optional< std::unordered_map<size_t, std::unordered_map<unsigned, RegisteredQueryHint> > > RelAlgExecutor::getParsedQueryHints ( )
inline

Definition at line 174 of file RelAlgExecutor.h.

References query_dag_.

Referenced by QueryRunner::QueryRunner::getParsedQueryHints().

174  {
175  return query_dag_ ? std::make_optional(query_dag_->getQueryHints()) : std::nullopt;
176  }
std::unique_ptr< RelAlgDag > query_dag_

+ Here is the caller graph for this function:

std::unordered_set< shared::TableKey > RelAlgExecutor::getPhysicalTableIds ( ) const

Definition at line 5506 of file RelAlgExecutor.cpp.

References get_physical_table_inputs(), and getRootRelAlgNode().

5506  {
5508 }
std::unordered_set< shared::TableKey > get_physical_table_inputs(const RelAlgNode *ra)
const RelAlgNode & getRootRelAlgNode() const

+ Here is the call graph for this function:

RaExecutionSequence RelAlgExecutor::getRaExecutionSequence ( const RelAlgNode root_node,
Executor executor 
)
inline

Definition at line 199 of file RelAlgExecutor.h.

References CHECK.

Referenced by QueryRunner::QueryRunner::getRaExecutionSequence().

200  {
201  CHECK(executor);
202  CHECK(root_node);
203  return RaExecutionSequence(root_node, executor);
204  }
A container for relational algebra descriptors defining the execution order for a relational algebra ...
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the caller graph for this function:

RelAlgDag* RelAlgExecutor::getRelAlgDag ( )
inline

Definition at line 139 of file RelAlgExecutor.h.

References query_dag_.

Referenced by executeRelAlgQueryNoRetry().

139  {
140  if (!query_dag_) {
141  query_dag_ = std::make_unique<RelAlgDag>();
142  }
143  return query_dag_.get();
144  }
std::unique_ptr< RelAlgDag > query_dag_

+ Here is the caller graph for this function:

std::shared_ptr< RelAlgTranslator > RelAlgExecutor::getRelAlgTranslator ( const RelAlgNode root_node)

Definition at line 4660 of file RelAlgExecutor.cpp.

References executor_, anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), RelAlgNode::getInput(), anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), now_, and query_state_.

4661  {
4662  auto input_to_nest_level = get_input_nest_levels(node, {});
4663  const auto left_deep_join =
4664  dynamic_cast<const RelLeftDeepInnerJoin*>(node->getInput(0));
4665  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
4666  : std::vector<JoinType>{get_join_type(node)};
4667  return std::make_shared<RelAlgTranslator>(
4668  query_state_, executor_, input_to_nest_level, join_types, now_, false);
4669 }
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:174
JoinType get_join_type(const RelAlgNode *ra)
std::shared_ptr< const query_state::QueryState > query_state_
Executor * executor_

+ Here is the call graph for this function:

const RelAlgNode& RelAlgExecutor::getRootRelAlgNode ( ) const
inline

Definition at line 146 of file RelAlgExecutor.h.

References CHECK, and query_dag_.

Referenced by computeColRangesCache(), computeStringDictionaryGenerations(), computeTableGenerations(), executeRelAlgQueryNoRetry(), getPhysicalTableIds(), and prepareForSystemTableExecution().

146  {
147  CHECK(query_dag_);
148  return query_dag_->getRootNode();
149  }
std::unique_ptr< RelAlgDag > query_dag_
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the caller graph for this function:

std::shared_ptr<const RelAlgNode> RelAlgExecutor::getRootRelAlgNodeShPtr ( ) const
inline

Definition at line 153 of file RelAlgExecutor.h.

References CHECK, and query_dag_.

Referenced by QueryRunner::QueryRunner::getQueryInfoForDataRecyclerTest(), and QueryRunner::QueryRunner::getRootNodeFromParsedQuery().

153  {
154  CHECK(query_dag_);
155  return query_dag_->getRootNodeShPtr();
156  }
std::unique_ptr< RelAlgDag > query_dag_
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the caller graph for this function:

const std::vector<std::shared_ptr<RexSubQuery> >& RelAlgExecutor::getSubqueries ( ) const
inlinenoexcept

Definition at line 163 of file RelAlgExecutor.h.

References CHECK, and query_dag_.

Referenced by executeRelAlgQueryNoRetry(), executeRelAlgQueryWithFilterPushDown(), and getOuterFragmentCount().

163  {
164  CHECK(query_dag_);
165  return query_dag_->getSubqueries();
166  };
std::unique_ptr< RelAlgDag > query_dag_
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the caller graph for this function:

void RelAlgExecutor::handleNop ( RaExecutionDesc ed)
private

Definition at line 1326 of file RelAlgExecutor.cpp.

References addTemporaryTable(), CHECK, CHECK_EQ, RaExecutionDesc::getBody(), and temporary_tables_.

Referenced by executeRelAlgStep().

1326  {
1327  // just set the result of the previous node as the result of no op
1328  auto body = ed.getBody();
1329  CHECK(dynamic_cast<const RelAggregate*>(body));
1330  CHECK_EQ(size_t(1), body->inputCount());
1331  const auto input = body->getInput(0);
1332  body->setOutputMetainfo(input->getOutputMetainfo());
1333  const auto it = temporary_tables_.find(-input->getId());
1334  CHECK(it != temporary_tables_.end());
1335  // set up temp table as it could be used by the outer query or next step
1336  addTemporaryTable(-body->getId(), it->second);
1337 
1338  ed.setResult({it->second, input->getOutputMetainfo()});
1339 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
TemporaryTables temporary_tables_
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
#define CHECK(condition)
Definition: Logger.h:291
const RelAlgNode * getBody() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::handleOutOfMemoryRetry ( const RelAlgExecutor::WorkUnit work_unit,
const std::vector< TargetMetaInfo > &  targets_meta,
const bool  is_agg,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const bool  was_multifrag_kernel_launch,
const int64_t  queue_time_ms 
)
private

Definition at line 4147 of file RelAlgExecutor.cpp.

References CHECK, anonymous_namespace{RelAlgExecutor.cpp}::decide_approx_count_distinct_implementation(), CompilationOptions::device_type, RelAlgExecutor::WorkUnit::exe_unit, executor_, RenderInfo::forceNonInSitu(), g_enable_watchdog, get_table_infos(), QueryExecutionError::getErrorCode(), handlePersistentError(), LOG, CompilationOptions::makeCpuOnly(), RelAlgExecutor::WorkUnit::max_groups_buffer_entry_guess, run_benchmark_import::result, ExecutionOptions::setNoExplainExecutionOptions(), target_exprs_owned_, RelAlgExecutionUnit::use_bump_allocator, VLOG, and logger::WARNING.

Referenced by executeWorkUnit().

4155  {
4156  // Disable the bump allocator
4157  // Note that this will have basically the same affect as using the bump allocator for
4158  // the kernel per fragment path. Need to unify the max_groups_buffer_entry_guess = 0
4159  // path and the bump allocator path for kernel per fragment execution.
4160  auto ra_exe_unit_in = work_unit.exe_unit;
4161  ra_exe_unit_in.use_bump_allocator = false;
4162 
4163  auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4164  co.device_type,
4166  nullptr,
4167  executor_->blockSize(),
4168  executor_->gridSize()),
4169  {}};
4170 
4171  const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
4172  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
4173  auto eo_no_multifrag = eo;
4174  eo_no_multifrag.setNoExplainExecutionOptions(true);
4175  eo_no_multifrag.allow_multifrag = false;
4176  eo_no_multifrag.find_push_down_candidates = false;
4177  if (was_multifrag_kernel_launch) {
4178  try {
4179  // Attempt to retry using the kernel per fragment path. The smaller input size
4180  // required may allow the entire kernel to execute in GPU memory.
4181  LOG(WARNING) << "Multifrag query ran out of memory, retrying with multifragment "
4182  "kernels disabled.";
4183  const auto ra_exe_unit = decide_approx_count_distinct_implementation(
4184  ra_exe_unit_in, table_infos, executor_, co.device_type, target_exprs_owned_);
4185  ColumnCacheMap column_cache;
4186  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
4187  is_agg,
4188  table_infos,
4189  ra_exe_unit,
4190  co,
4191  eo_no_multifrag,
4192  nullptr,
4193  true,
4194  column_cache),
4195  targets_meta};
4196  result.setQueueTime(queue_time_ms);
4197  } catch (const QueryExecutionError& e) {
4199  LOG(WARNING) << "Kernel per fragment query ran out of memory, retrying on CPU.";
4200  }
4201  }
4202 
4203  if (render_info) {
4204  render_info->forceNonInSitu();
4205  }
4206 
4207  const auto co_cpu = CompilationOptions::makeCpuOnly(co);
4208  // Only reset the group buffer entry guess if we ran out of slots, which
4209  // suggests a
4210  // highly pathological input which prevented a good estimation of distinct tuple
4211  // count. For projection queries, this will force a per-fragment scan limit, which is
4212  // compatible with the CPU path
4213  VLOG(1) << "Resetting max groups buffer entry guess.";
4214  max_groups_buffer_entry_guess = 0;
4215 
4216  int iteration_ctr = -1;
4217  while (true) {
4218  iteration_ctr++;
4220  ra_exe_unit_in, table_infos, executor_, co_cpu.device_type, target_exprs_owned_);
4221  ColumnCacheMap column_cache;
4222  try {
4223  result = {executor_->executeWorkUnit(max_groups_buffer_entry_guess,
4224  is_agg,
4225  table_infos,
4226  ra_exe_unit,
4227  co_cpu,
4228  eo_no_multifrag,
4229  nullptr,
4230  true,
4231  column_cache),
4232  targets_meta};
4233  } catch (const QueryExecutionError& e) {
4234  // Ran out of slots
4235  if (e.getErrorCode() < 0) {
4236  // Even the conservative guess failed; it should only happen when we group
4237  // by a huge cardinality array. Maybe we should throw an exception instead?
4238  // Such a heavy query is entirely capable of exhausting all the host memory.
4239  CHECK(max_groups_buffer_entry_guess);
4240  // Only allow two iterations of increasingly large entry guesses up to a maximum
4241  // of 512MB per column per kernel
4242  if (g_enable_watchdog || iteration_ctr > 1) {
4243  throw std::runtime_error("Query ran out of output slots in the result");
4244  }
4245  max_groups_buffer_entry_guess *= 2;
4246  LOG(WARNING) << "Query ran out of slots in the output buffer, retrying with max "
4247  "groups buffer entry "
4248  "guess equal to "
4249  << max_groups_buffer_entry_guess;
4250  } else {
4252  }
4253  continue;
4254  }
4255  result.setQueueTime(queue_time_ms);
4256  return result;
4257  }
4258  return result;
4259 }
bool is_agg(const Analyzer::Expr *expr)
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
void forceNonInSitu()
Definition: RenderInfo.cpp:46
#define LOG(tag)
Definition: Logger.h:285
RelAlgExecutionUnit exe_unit
void setNoExplainExecutionOptions(bool no_validation=false)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool g_enable_watchdog
ExecutorDeviceType device_type
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
static void handlePersistentError(const int32_t error_code)
const size_t max_groups_buffer_entry_guess
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
RelAlgExecutionUnit decide_approx_count_distinct_implementation(const RelAlgExecutionUnit &ra_exe_unit_in, const std::vector< InputTableInfo > &table_infos, const Executor *executor, const ExecutorDeviceType device_type_in, std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned)
Executor * executor_
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::handlePersistentError ( const int32_t  error_code)
staticprivate

Definition at line 4261 of file RelAlgExecutor.cpp.

References Executor::ERR_OUT_OF_GPU_MEM, logger::ERROR, g_allow_cpu_retry, getErrorMessageFromCode(), logger::INFO, and LOG.

Referenced by executeTableFunction(), executeWorkUnit(), and handleOutOfMemoryRetry().

4261  {
4262  LOG(ERROR) << "Query execution failed with error "
4265  // We ran out of GPU memory, this doesn't count as an error if the query is
4266  // allowed to continue on CPU because retry on CPU is explicitly allowed through
4267  // --allow-cpu-retry.
4268  LOG(INFO) << "Query ran out of GPU memory, attempting punt to CPU";
4269  if (!g_allow_cpu_retry) {
4270  throw std::runtime_error(
4271  "Query ran out of GPU memory, unable to automatically retry on CPU");
4272  }
4273  return;
4274  }
4275  throw std::runtime_error(getErrorMessageFromCode(error_code));
4276 }
#define LOG(tag)
Definition: Logger.h:285
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1616
def error_code
Definition: report.py:244
bool g_allow_cpu_retry
Definition: Execute.cpp:89
static std::string getErrorMessageFromCode(const int32_t error_code)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool RelAlgExecutor::hasDeletedRowInQuery ( std::vector< InputTableInfo > const &  input_tables_info) const
private

Definition at line 4016 of file RelAlgExecutor.cpp.

References anonymous_namespace{QueryMemoryDescriptor.cpp}::any_of(), CHECK, Catalog_Namespace::SysCatalog::getCatalog(), and Catalog_Namespace::SysCatalog::instance().

Referenced by getFilteredCountAll().

4017  {
4018  return std::any_of(
4019  input_tables_info.begin(), input_tables_info.end(), [](InputTableInfo const& info) {
4020  auto const& table_key = info.table_key;
4021  if (table_key.db_id > 0) {
4022  auto catalog =
4024  CHECK(catalog);
4025  auto td = catalog->getMetadataForTable(table_key.table_id);
4026  CHECK(td);
4027  if (catalog->getDeletedColumnIfRowsDeleted(td)) {
4028  return true;
4029  }
4030  }
4031  return false;
4032  });
4033 }
static SysCatalog & instance()
Definition: SysCatalog.h:343
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
#define CHECK(condition)
Definition: Logger.h:291
bool any_of(std::vector< Analyzer::Expr * > const &target_exprs)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool RelAlgExecutor::hasStepForUnion ( ) const
inlineprivate

Definition at line 431 of file RelAlgExecutor.h.

References has_step_for_union_.

Referenced by canUseResultsetCache(), executeSort(), executeTableFunction(), and executeWorkUnit().

431 { return has_step_for_union_; }

+ Here is the caller graph for this function:

void RelAlgExecutor::initializeParallelismHints ( )
private

Definition at line 5484 of file RelAlgExecutor.cpp.

References executor_.

Referenced by RelAlgExecutor().

5484  {
5485  if (auto foreign_storage_mgr =
5486  executor_->getDataMgr()->getPersistentStorageMgr()->getForeignStorageMgr()) {
5487  // Parallelism hints need to be reset to empty so that we don't accidentally re-use
5488  // them. This can cause attempts to fetch strings that do not shard to the correct
5489  // node in distributed mode.
5490  foreign_storage_mgr->setParallelismHints({});
5491  }
5492 }
Executor * executor_

+ Here is the caller graph for this function:

bool RelAlgExecutor::isRowidLookup ( const WorkUnit work_unit)
private

Definition at line 4111 of file RelAlgExecutor.cpp.

References CHECK_EQ, RelAlgExecutor::WorkUnit::exe_unit, get_column_descriptor(), Analyzer::BinOper::get_left_operand(), RelAlgExecutionUnit::input_descs, kEQ, and TABLE.

Referenced by executeWorkUnit().

4111  {
4112  const auto& ra_exe_unit = work_unit.exe_unit;
4113  if (ra_exe_unit.input_descs.size() != 1) {
4114  return false;
4115  }
4116  const auto& table_desc = ra_exe_unit.input_descs.front();
4117  if (table_desc.getSourceType() != InputSourceType::TABLE) {
4118  return false;
4119  }
4120  const auto& table_key = table_desc.getTableKey();
4121  for (const auto& simple_qual : ra_exe_unit.simple_quals) {
4122  const auto comp_expr =
4123  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
4124  if (!comp_expr || comp_expr->get_optype() != kEQ) {
4125  return false;
4126  }
4127  const auto lhs = comp_expr->get_left_operand();
4128  const auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
4129  if (!lhs_col || !lhs_col->getTableKey().table_id || lhs_col->get_rte_idx()) {
4130  return false;
4131  }
4132  const auto rhs = comp_expr->get_right_operand();
4133  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
4134  if (!rhs_const) {
4135  return false;
4136  }
4137  const auto cd = get_column_descriptor(
4138  {table_key.db_id, table_key.table_id, lhs_col->getColumnKey().column_id});
4139  if (cd->isVirtualCol) {
4140  CHECK_EQ("rowid", cd->columnName);
4141  return true;
4142  }
4143  }
4144  return false;
4145 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
Definition: sqldefs.h:29
const ColumnDescriptor * get_column_descriptor(const shared::ColumnKey &column_key)
Definition: Execute.h:213
const Expr * get_left_operand() const
Definition: Analyzer.h:455

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::list< std::shared_ptr< Analyzer::Expr > > RelAlgExecutor::makeJoinQuals ( const RexScalar join_condition,
const std::vector< JoinType > &  join_types,
const std::unordered_map< const RelAlgNode *, int > &  input_to_nest_level,
const bool  just_explain 
) const
private

Definition at line 4765 of file RelAlgExecutor.cpp.

References combine_equi_join_conditions(), executor_, fold_expr(), anonymous_namespace{RelAlgExecutor.cpp}::get_bitwise_equals_conjunction(), now_, qual_to_conjunctive_form(), query_state_, anonymous_namespace{RelAlgExecutor.cpp}::reverse_logical_distribution(), anonymous_namespace{RelAlgExecutor.cpp}::rex_to_conjunctive_form(), and RelAlgTranslator::translate().

Referenced by translateLeftDeepJoinFilter().

4769  {
4770  RelAlgTranslator translator(
4771  query_state_, executor_, input_to_nest_level, join_types, now_, just_explain);
4772  const auto rex_condition_cf = rex_to_conjunctive_form(join_condition);
4773  std::list<std::shared_ptr<Analyzer::Expr>> join_condition_quals;
4774  for (const auto rex_condition_component : rex_condition_cf) {
4775  const auto bw_equals = get_bitwise_equals_conjunction(rex_condition_component);
4776  const auto join_condition = reverse_logical_distribution(
4777  translator.translate(bw_equals ? bw_equals.get() : rex_condition_component));
4778  auto join_condition_cf = qual_to_conjunctive_form(join_condition);
4779 
4780  auto append_folded_cf_quals = [&join_condition_quals](const auto& cf_quals) {
4781  for (const auto& cf_qual : cf_quals) {
4782  join_condition_quals.emplace_back(fold_expr(cf_qual.get()));
4783  }
4784  };
4785 
4786  append_folded_cf_quals(join_condition_cf.quals);
4787  append_folded_cf_quals(join_condition_cf.simple_quals);
4788  }
4789  return combine_equi_join_conditions(join_condition_quals);
4790 }
std::unique_ptr< const RexOperator > get_bitwise_equals_conjunction(const RexScalar *scalar)
QualsConjunctiveForm qual_to_conjunctive_form(const std::shared_ptr< Analyzer::Expr > qual_expr)
std::vector< const RexScalar * > rex_to_conjunctive_form(const RexScalar *qual_expr)
std::shared_ptr< Analyzer::Expr > reverse_logical_distribution(const std::shared_ptr< Analyzer::Expr > &expr)
std::shared_ptr< const query_state::QueryState > query_state_
Executor * executor_
std::list< std::shared_ptr< Analyzer::Expr > > combine_equi_join_conditions(const std::list< std::shared_ptr< Analyzer::Expr >> &join_quals)
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::prepareForeignTable ( )
void RelAlgExecutor::prepareForeignTables ( )

Definition at line 5501 of file RelAlgExecutor.cpp.

References anonymous_namespace{RelAlgExecutor.cpp}::prepare_foreign_table_for_execution(), and query_dag_.

5501  {
5502  const auto& ra = query_dag_->getRootNode();
5504 }
std::unique_ptr< RelAlgDag > query_dag_
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node)

+ Here is the call graph for this function:

void RelAlgExecutor::prepareForSystemTableExecution ( const CompilationOptions co) const

Definition at line 5510 of file RelAlgExecutor.cpp.

References getRootRelAlgNode(), and anonymous_namespace{RelAlgExecutor.cpp}::prepare_for_system_table_execution().

5510  {
5512 }
const RelAlgNode & getRootRelAlgNode() const
void prepare_for_system_table_execution(const RelAlgNode &ra_node, const CompilationOptions &co)

+ Here is the call graph for this function:

void RelAlgExecutor::prepareLeafExecution ( const AggregatedColRange agg_col_range,
const StringDictionaryGenerations string_dictionary_generations,
const TableGenerations table_generations 
)

Definition at line 872 of file RelAlgExecutor.cpp.

References cpu_threads(), executor_, g_enable_dynamic_watchdog, Executor::getArenaBlockSize(), queue_time_ms_, timer_start(), and timer_stop().

875  {
876  // capture the lock acquistion time
877  auto clock_begin = timer_start();
879  executor_->resetInterrupt();
880  }
881  queue_time_ms_ = timer_stop(clock_begin);
882  executor_->row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>(
884  executor_->row_set_mem_owner_->setDictionaryGenerations(string_dictionary_generations);
885  executor_->table_generations_ = table_generations;
886  executor_->agg_col_range_cache_ = agg_col_range;
887 }
int64_t queue_time_ms_
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:81
int cpu_threads()
Definition: thread_count.h:25
static size_t getArenaBlockSize()
Definition: Execute.cpp:558
Executor * executor_
Type timer_start()
Definition: measure.h:42

+ Here is the call graph for this function:

std::vector< PushedDownFilterInfo > RelAlgExecutor::selectFiltersToBePushedDown ( const RelAlgExecutor::WorkUnit work_unit,
const CompilationOptions co,
const ExecutionOptions eo 
)
private

Goes through all candidate filters and evaluate whether they pass the selectivity criteria or not.

Definition at line 127 of file JoinFilterPushDown.cpp.

References RelAlgExecutor::WorkUnit::exe_unit, executor_, find_push_down_filters(), get_table_infos(), getFilterSelectivity(), RelAlgExecutionUnit::input_descs, RelAlgExecutor::WorkUnit::input_permutation, RelAlgExecutor::WorkUnit::left_deep_join_input_sizes, and to_gather_info_for_filter_selectivity().

Referenced by executeWorkUnit().

130  {
131  const auto all_push_down_candidates =
133  work_unit.input_permutation,
134  work_unit.left_deep_join_input_sizes);
135  std::vector<PushedDownFilterInfo> selective_push_down_candidates;
136  const auto ti = get_table_infos(work_unit.exe_unit.input_descs, executor_);
138  for (const auto& candidate : all_push_down_candidates) {
139  const auto selectivity = getFilterSelectivity(candidate.filter_expressions, co, eo);
140  if (selectivity.is_valid && selectivity.isFilterSelectiveEnough()) {
141  selective_push_down_candidates.push_back(candidate);
142  }
143  }
144  }
145  return selective_push_down_candidates;
146 }
const std::vector< size_t > left_deep_join_input_sizes
RelAlgExecutionUnit exe_unit
FilterSelectivity getFilterSelectivity(const std::vector< std::shared_ptr< Analyzer::Expr >> &filter_expressions, const CompilationOptions &co, const ExecutionOptions &eo)
std::vector< InputDescriptor > input_descs
bool to_gather_info_for_filter_selectivity(const std::vector< InputTableInfo > &table_infos)
std::vector< PushedDownFilterInfo > find_push_down_filters(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< size_t > &input_permutation, const std::vector< size_t > &left_deep_join_input_sizes)
const std::vector< size_t > input_permutation
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::setHasStepForUnion ( bool  flag)
inlineprivate

Definition at line 429 of file RelAlgExecutor.h.

References has_step_for_union_.

Referenced by executeRelAlgStep().

429 { has_step_for_union_ = flag; }

+ Here is the caller graph for this function:

void RelAlgExecutor::setupCaching ( const RelAlgNode ra)
private

Definition at line 5494 of file RelAlgExecutor.cpp.

References CHECK, executor_, anonymous_namespace{RelAlgExecutor.cpp}::get_physical_inputs_with_spi_col_id(), and get_physical_table_inputs().

Referenced by executeRelAlgQueryNoRetry(), and getOuterFragmentCount().

5494  {
5495  CHECK(executor_);
5496  const auto phys_inputs = get_physical_inputs_with_spi_col_id(ra);
5497  const auto phys_table_ids = get_physical_table_inputs(ra);
5498  executor_->setupCaching(phys_inputs, phys_table_ids);
5499 }
std::unordered_set< shared::TableKey > get_physical_table_inputs(const RelAlgNode *ra)
#define CHECK(condition)
Definition: Logger.h:291
std::unordered_set< PhysicalInput > get_physical_inputs_with_spi_col_id(const RelAlgNode *ra)
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

JoinQualsPerNestingLevel RelAlgExecutor::translateLeftDeepJoinFilter ( const RelLeftDeepInnerJoin join,
const std::vector< InputDescriptor > &  input_descs,
const std::unordered_map< const RelAlgNode *, int > &  input_to_nest_level,
const bool  just_explain 
)
private

Definition at line 4795 of file RelAlgExecutor.cpp.

References ANTI, CHECK, CHECK_LE, RelLeftDeepInnerJoin::getInnerCondition(), RelLeftDeepInnerJoin::getOuterCondition(), INNER, LEFT, anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), makeJoinQuals(), run_benchmark_import::result, and SEMI.

Referenced by createCompoundWorkUnit(), and createProjectWorkUnit().

4799  {
4800  const auto join_types = left_deep_join_types(join);
4801  const auto join_condition_quals = makeJoinQuals(
4802  join->getInnerCondition(), join_types, input_to_nest_level, just_explain);
4803  MaxRangeTableIndexVisitor rte_idx_visitor;
4804  JoinQualsPerNestingLevel result(input_descs.size() - 1);
4805  std::unordered_set<std::shared_ptr<Analyzer::Expr>> visited_quals;
4806  for (size_t rte_idx = 1; rte_idx < input_descs.size(); ++rte_idx) {
4807  const auto outer_condition = join->getOuterCondition(rte_idx);
4808  if (outer_condition) {
4809  result[rte_idx - 1].quals =
4810  makeJoinQuals(outer_condition, join_types, input_to_nest_level, just_explain);
4811  CHECK_LE(rte_idx, join_types.size());
4812  CHECK(join_types[rte_idx - 1] == JoinType::LEFT);
4813  result[rte_idx - 1].type = JoinType::LEFT;
4814  continue;
4815  }
4816  for (const auto& qual : join_condition_quals) {
4817  if (visited_quals.count(qual)) {
4818  continue;
4819  }
4820  const auto qual_rte_idx = rte_idx_visitor.visit(qual.get());
4821  if (static_cast<size_t>(qual_rte_idx) <= rte_idx) {
4822  const auto it_ok = visited_quals.emplace(qual);
4823  CHECK(it_ok.second);
4824  result[rte_idx - 1].quals.push_back(qual);
4825  }
4826  }
4827  CHECK_LE(rte_idx, join_types.size());
4828  CHECK(join_types[rte_idx - 1] == JoinType::INNER ||
4829  join_types[rte_idx - 1] == JoinType::SEMI ||
4830  join_types[rte_idx - 1] == JoinType::ANTI);
4831  result[rte_idx - 1].type = join_types[rte_idx - 1];
4832  }
4833  return result;
4834 }
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
const RexScalar * getOuterCondition(const size_t nesting_level) const
std::vector< JoinCondition > JoinQualsPerNestingLevel
#define CHECK_LE(x, y)
Definition: Logger.h:304
const RexScalar * getInnerCondition() const
#define CHECK(condition)
Definition: Logger.h:291
std::list< std::shared_ptr< Analyzer::Expr > > makeJoinQuals(const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Friends And Related Function Documentation

friend class PendingExecutionClosure
friend

Definition at line 452 of file RelAlgExecutor.h.

Member Data Documentation

std::unique_ptr<TransactionParameters> RelAlgExecutor::dml_transaction_parameters_
private

Definition at line 449 of file RelAlgExecutor.h.

Referenced by executeDelete(), and executeUpdate().

bool RelAlgExecutor::has_step_for_union_
private

Definition at line 446 of file RelAlgExecutor.h.

Referenced by hasStepForUnion(), and setHasStepForUnion().

std::unordered_map<unsigned, AggregatedResult> RelAlgExecutor::leaf_results_
private

Definition at line 444 of file RelAlgExecutor.h.

Referenced by addLeafResult(), executeSort(), executeUpdate(), and executeWorkUnit().

std::unordered_map<unsigned, JoinQualsPerNestingLevel> RelAlgExecutor::left_deep_join_info_
private
std::optional<std::function<void()> > RelAlgExecutor::post_execution_callback_
private
int64_t RelAlgExecutor::queue_time_ms_
private

Definition at line 445 of file RelAlgExecutor.h.

Referenced by executeRelAlgQuerySingleStep(), and prepareLeafExecution().

SpeculativeTopNBlacklist RelAlgExecutor::speculative_topn_blacklist_
staticprivate

Definition at line 447 of file RelAlgExecutor.h.

Referenced by createSortInputWorkUnit(), and executeSort().


The documentation for this class was generated from the following files: