OmniSciDB  ba1bac9284
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RelAlgExecutor Class Reference

#include <RelAlgExecutor.h>

+ Inheritance diagram for RelAlgExecutor:
+ Collaboration diagram for RelAlgExecutor:

Classes

struct  TableFunctionWorkUnit
 
struct  WorkUnit
 

Public Types

using TargetInfoList = std::vector< TargetInfo >
 

Public Member Functions

 RelAlgExecutor (Executor *executor, const Catalog_Namespace::Catalog &cat, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
 RelAlgExecutor (Executor *executor, const Catalog_Namespace::Catalog &cat, const std::string &query_ra, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
 RelAlgExecutor (Executor *executor, const Catalog_Namespace::Catalog &cat, std::unique_ptr< RelAlgDagBuilder > query_dag, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
 
size_t getOuterFragmentCount (const CompilationOptions &co, const ExecutionOptions &eo)
 
ExecutionResult executeRelAlgQuery (const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
 
ExecutionResult executeRelAlgQueryWithFilterPushDown (const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
void prepareLeafExecution (const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
 
ExecutionResult executeRelAlgSeq (const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
 
ExecutionResult executeRelAlgSubSeq (const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
QueryStepExecutionResult executeRelAlgQuerySingleStep (const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
 
void addLeafResult (const unsigned id, const AggregatedResult &result)
 
const RelAlgNodegetRootRelAlgNode () const
 
const std::vector
< std::shared_ptr< RexSubQuery > > & 
getSubqueries () const noexcept
 
RegisteredQueryHint getParsedQueryHints ()
 
ExecutionResult executeSimpleInsert (const Analyzer::Query &insert_query)
 
AggregatedColRange computeColRangesCache ()
 
StringDictionaryGenerations computeStringDictionaryGenerations ()
 
TableGenerations computeTableGenerations ()
 
ExecutorgetExecutor () const
 
void cleanupPostExecution ()
 
void executePostExecutionCallback ()
 

Static Public Member Functions

static std::string getErrorMessageFromCode (const int32_t error_code)
 

Private Member Functions

ExecutionResult executeRelAlgQueryNoRetry (const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
 
void executeRelAlgStep (const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
void executeUpdate (const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
 
void executeDelete (const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
 
ExecutionResult executeCompound (const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeAggregate (const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
 
ExecutionResult executeProject (const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
 
ExecutionResult executeTableFunction (const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
 
void computeWindow (const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
 
std::unique_ptr
< WindowFunctionContext
createWindowFunctionContext (const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
 
ExecutionResult executeFilter (const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeSort (const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
ExecutionResult executeLogicalValues (const RelLogicalValues *, const ExecutionOptions &)
 
ExecutionResult executeModify (const RelModify *modify, const ExecutionOptions &eo)
 
ExecutionResult executeUnion (const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
 
WorkUnit createSortInputWorkUnit (const RelSort *, const ExecutionOptions &eo)
 
ExecutionResult executeWorkUnit (const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
 
size_t getNDVEstimation (const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
 
std::optional< size_t > getFilteredCountAll (const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
 
FilterSelectivity getFilterSelectivity (const std::vector< std::shared_ptr< Analyzer::Expr >> &filter_expressions, const CompilationOptions &co, const ExecutionOptions &eo)
 
std::vector< PushedDownFilterInfoselectFiltersToBePushedDown (const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
 
bool isRowidLookup (const WorkUnit &work_unit)
 
ExecutionResult handleOutOfMemoryRetry (const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
 
WorkUnit createWorkUnit (const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createCompoundWorkUnit (const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createAggregateWorkUnit (const RelAggregate *, const SortInfo &, const bool just_explain)
 
WorkUnit createProjectWorkUnit (const RelProject *, const SortInfo &, const ExecutionOptions &eo)
 
WorkUnit createFilterWorkUnit (const RelFilter *, const SortInfo &, const bool just_explain)
 
WorkUnit createJoinWorkUnit (const RelJoin *, const SortInfo &, const bool just_explain)
 
WorkUnit createUnionWorkUnit (const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
 
TableFunctionWorkUnit createTableFunctionWorkUnit (const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
 
void addTemporaryTable (const int table_id, const ResultSetPtr &result)
 
void eraseFromTemporaryTables (const int table_id)
 
void handleNop (RaExecutionDesc &ed)
 
JoinQualsPerNestingLevel translateLeftDeepJoinFilter (const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
 
std::list< std::shared_ptr
< Analyzer::Expr > > 
makeJoinQuals (const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
 
- Private Member Functions inherited from StorageIOFacility
 StorageIOFacility (Executor *executor, Catalog_Namespace::Catalog const &catalog)
 
StorageIOFacility::UpdateCallback yieldUpdateCallback (UpdateTransactionParameters &update_parameters)
 
StorageIOFacility::UpdateCallback yieldDeleteCallback (DeleteTransactionParameters &delete_parameters)
 

Static Private Member Functions

static void handlePersistentError (const int32_t error_code)
 

Private Attributes

Executorexecutor_
 
const Catalog_Namespace::Catalogcat_
 
std::unique_ptr< RelAlgDagBuilderquery_dag_
 
std::shared_ptr< const
query_state::QueryState
query_state_
 
TemporaryTables temporary_tables_
 
time_t now_
 
std::vector< std::shared_ptr
< Analyzer::Expr > > 
target_exprs_owned_
 
std::unordered_map< unsigned,
AggregatedResult
leaf_results_
 
int64_t queue_time_ms_
 
std::unique_ptr
< TransactionParameters
dml_transaction_parameters_
 
std::optional< std::function
< void()> > 
post_execution_callback_
 

Static Private Attributes

static SpeculativeTopNBlacklist speculative_topn_blacklist_
 

Friends

class PendingExecutionClosure
 

Additional Inherited Members

- Private Types inherited from StorageIOFacility
using UpdateCallback = UpdateLogForFragment::Callback
 
using TableDescriptorType = TableDescriptor
 
using DeleteVictimOffsetList = std::vector< uint64_t >
 
using UpdateTargetOffsetList = std::vector< uint64_t >
 
using UpdateTargetTypeList = std::vector< TargetMetaInfo >
 
using UpdateTargetColumnNamesList = std::vector< std::string >
 
using TransactionLog = Fragmenter_Namespace::InsertOrderFragmenter::ModifyTransactionTracker
 
using TransactionLogPtr = std::unique_ptr< TransactionLog >
 
using ColumnValidationFunction = std::function< bool(std::string const &)>
 

Detailed Description

Definition at line 48 of file RelAlgExecutor.h.

Member Typedef Documentation

Definition at line 50 of file RelAlgExecutor.h.

Constructor & Destructor Documentation

RelAlgExecutor::RelAlgExecutor ( Executor executor,
const Catalog_Namespace::Catalog cat,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 52 of file RelAlgExecutor.h.

55  : StorageIOFacility(executor, cat)
56  , executor_(executor)
57  , cat_(cat)
58  , query_state_(std::move(query_state))
59  , now_(0)
60  , queue_time_ms_(0) {}
int64_t queue_time_ms_
StorageIOFacility(Executor *executor, Catalog_Namespace::Catalog const &catalog)
const Catalog_Namespace::Catalog & cat_
std::shared_ptr< const query_state::QueryState > query_state_
Executor * executor_
RelAlgExecutor::RelAlgExecutor ( Executor executor,
const Catalog_Namespace::Catalog cat,
const std::string &  query_ra,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 62 of file RelAlgExecutor.h.

66  : StorageIOFacility(executor, cat)
67  , executor_(executor)
68  , cat_(cat)
69  , query_dag_(std::make_unique<RelAlgDagBuilder>(query_ra, cat_, nullptr))
70  , query_state_(std::move(query_state))
71  , now_(0)
72  , queue_time_ms_(0) {}
int64_t queue_time_ms_
StorageIOFacility(Executor *executor, Catalog_Namespace::Catalog const &catalog)
const Catalog_Namespace::Catalog & cat_
std::shared_ptr< const query_state::QueryState > query_state_
std::unique_ptr< RelAlgDagBuilder > query_dag_
Executor * executor_
RelAlgExecutor::RelAlgExecutor ( Executor executor,
const Catalog_Namespace::Catalog cat,
std::unique_ptr< RelAlgDagBuilder query_dag,
std::shared_ptr< const query_state::QueryState query_state = nullptr 
)
inline

Definition at line 74 of file RelAlgExecutor.h.

78  : StorageIOFacility(executor, cat)
79  , executor_(executor)
80  , cat_(cat)
81  , query_dag_(std::move(query_dag))
82  , query_state_(std::move(query_state))
83  , now_(0)
84  , queue_time_ms_(0) {}
int64_t queue_time_ms_
StorageIOFacility(Executor *executor, Catalog_Namespace::Catalog const &catalog)
const Catalog_Namespace::Catalog & cat_
std::shared_ptr< const query_state::QueryState > query_state_
std::unique_ptr< RelAlgDagBuilder > query_dag_
Executor * executor_

Member Function Documentation

void RelAlgExecutor::addLeafResult ( const unsigned  id,
const AggregatedResult result 
)
inline

Definition at line 124 of file RelAlgExecutor.h.

References CHECK, and leaf_results_.

124  {
125  const auto it_ok = leaf_results_.emplace(id, result);
126  CHECK(it_ok.second);
127  }
std::unordered_map< unsigned, AggregatedResult > leaf_results_
#define CHECK(condition)
Definition: Logger.h:206
void RelAlgExecutor::addTemporaryTable ( const int  table_id,
const ResultSetPtr result 
)
inlineprivate

Definition at line 335 of file RelAlgExecutor.h.

References CHECK, CHECK_LT, and temporary_tables_.

Referenced by executeRelAlgStep(), and handleNop().

335  {
336  CHECK_LT(size_t(0), result->colCount());
337  CHECK_LT(table_id, 0);
338  const auto it_ok = temporary_tables_.emplace(table_id, result);
339  CHECK(it_ok.second);
340  }
TemporaryTables temporary_tables_
#define CHECK_LT(x, y)
Definition: Logger.h:216
#define CHECK(condition)
Definition: Logger.h:206

+ Here is the caller graph for this function:

void RelAlgExecutor::cleanupPostExecution ( )

Definition at line 483 of file RelAlgExecutor.cpp.

References CHECK, and executor_.

Referenced by executeRelAlgQueryNoRetry(), and getOuterFragmentCount().

483  {
484  CHECK(executor_);
485  executor_->row_set_mem_owner_ = nullptr;
486 }
#define CHECK(condition)
Definition: Logger.h:206
Executor * executor_

+ Here is the caller graph for this function:

AggregatedColRange RelAlgExecutor::computeColRangesCache ( )

Definition at line 463 of file RelAlgExecutor.cpp.

References cat_, executor_, get_physical_inputs(), and getRootRelAlgNode().

463  {
464  AggregatedColRange agg_col_range_cache;
465  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
466  return executor_->computeColRangesCache(phys_inputs);
467 }
const Catalog_Namespace::Catalog & cat_
const RelAlgNode & getRootRelAlgNode() const
std::unordered_set< PhysicalInput > get_physical_inputs(const RelAlgNode *ra)
Executor * executor_

+ Here is the call graph for this function:

StringDictionaryGenerations RelAlgExecutor::computeStringDictionaryGenerations ( )

Definition at line 469 of file RelAlgExecutor.cpp.

References cat_, executor_, get_physical_inputs(), and getRootRelAlgNode().

469  {
470  const auto phys_inputs = get_physical_inputs(cat_, &getRootRelAlgNode());
471  return executor_->computeStringDictionaryGenerations(phys_inputs);
472 }
const Catalog_Namespace::Catalog & cat_
const RelAlgNode & getRootRelAlgNode() const
std::unordered_set< PhysicalInput > get_physical_inputs(const RelAlgNode *ra)
Executor * executor_

+ Here is the call graph for this function:

TableGenerations RelAlgExecutor::computeTableGenerations ( )

Definition at line 474 of file RelAlgExecutor.cpp.

References executor_, get_physical_table_inputs(), and getRootRelAlgNode().

474  {
475  const auto phys_table_ids = get_physical_table_inputs(&getRootRelAlgNode());
476  return executor_->computeTableGenerations(phys_table_ids);
477 }
const RelAlgNode & getRootRelAlgNode() const
Executor * executor_
std::unordered_set< int > get_physical_table_inputs(const RelAlgNode *ra)

+ Here is the call graph for this function:

void RelAlgExecutor::computeWindow ( const RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co,
const ExecutionOptions eo,
ColumnCacheMap column_cache_map,
const int64_t  queue_time_ms 
)
private

Definition at line 1879 of file RelAlgExecutor.cpp.

References CHECK_EQ, WindowProjectNodeContext::create(), createWindowFunctionContext(), executor_, ExecutionOptions::executor_type, Extern, get_table_infos(), Analyzer::WindowFunction::getPartitionKeys(), RelAlgExecutionUnit::input_descs, kBOOLEAN, kBW_EQ, kONE, RelAlgExecutionUnit::target_exprs, and anonymous_namespace{RelAlgExecutor.cpp}::transform_to_inner().

Referenced by executeWorkUnit().

1883  {
1884  auto query_infos = get_table_infos(ra_exe_unit.input_descs, executor_);
1885  CHECK_EQ(query_infos.size(), size_t(1));
1886  if (query_infos.front().info.fragments.size() != 1) {
1887  throw std::runtime_error(
1888  "Only single fragment tables supported for window functions for now");
1889  }
1890  if (eo.executor_type == ::ExecutorType::Extern) {
1891  return;
1892  }
1893  query_infos.push_back(query_infos.front());
1894  auto window_project_node_context = WindowProjectNodeContext::create(executor_);
1895  for (size_t target_index = 0; target_index < ra_exe_unit.target_exprs.size();
1896  ++target_index) {
1897  const auto& target_expr = ra_exe_unit.target_exprs[target_index];
1898  const auto window_func = dynamic_cast<const Analyzer::WindowFunction*>(target_expr);
1899  if (!window_func) {
1900  continue;
1901  }
1902  // Always use baseline layout hash tables for now, make the expression a tuple.
1903  const auto& partition_keys = window_func->getPartitionKeys();
1904  std::shared_ptr<Analyzer::Expr> partition_key_tuple;
1905  if (partition_keys.size() > 1) {
1906  partition_key_tuple = makeExpr<Analyzer::ExpressionTuple>(partition_keys);
1907  } else {
1908  if (partition_keys.empty()) {
1909  throw std::runtime_error(
1910  "Empty window function partitions are not supported yet");
1911  }
1912  CHECK_EQ(partition_keys.size(), size_t(1));
1913  partition_key_tuple = partition_keys.front();
1914  }
1915  // Creates a tautology equality with the partition expression on both sides.
1916  const auto partition_key_cond =
1917  makeExpr<Analyzer::BinOper>(kBOOLEAN,
1918  kBW_EQ,
1919  kONE,
1920  partition_key_tuple,
1921  transform_to_inner(partition_key_tuple.get()));
1922  auto context = createWindowFunctionContext(window_func,
1923  partition_key_cond,
1924  ra_exe_unit,
1925  query_infos,
1926  co,
1927  column_cache_map,
1928  executor_->getRowSetMemoryOwner());
1929  context->compute();
1930  window_project_node_context->addWindowFunctionContext(std::move(context),
1931  target_index);
1932  }
1933 }
std::vector< Analyzer::Expr * > target_exprs
#define CHECK_EQ(x, y)
Definition: Logger.h:214
std::vector< InputDescriptor > input_descs
static WindowProjectNodeContext * create(Executor *executor)
std::shared_ptr< Analyzer::Expr > transform_to_inner(const Analyzer::Expr *expr)
ExecutorType executor_type
Definition: sqldefs.h:69
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqldefs.h:31
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:1451
Executor * executor_
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createAggregateWorkUnit ( const RelAggregate aggregate,
const SortInfo sort_info,
const bool  just_explain 
)
private

Definition at line 3805 of file RelAlgExecutor.cpp.

References cat_, CHECK_EQ, RegisteredQueryHint::defaults(), executor_, g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelAlgNode::inputCount(), now_, query_dag_, query_state_, RelAlgNode::setOutputMetainfo(), anonymous_namespace{RelAlgExecutor.cpp}::synthesize_inputs(), target_exprs_owned_, anonymous_namespace{RelAlgExecutor.cpp}::translate_groupby_exprs(), and anonymous_namespace{RelAlgExecutor.cpp}::translate_targets().

Referenced by createWorkUnit(), and executeAggregate().

3808  {
3809  std::vector<InputDescriptor> input_descs;
3810  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3811  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
3812  const auto input_to_nest_level = get_input_nest_levels(aggregate, {});
3813  std::tie(input_descs, input_col_descs, used_inputs_owned) =
3814  get_input_desc(aggregate, input_to_nest_level, {}, cat_);
3815  const auto join_type = get_join_type(aggregate);
3816 
3817  RelAlgTranslator translator(cat_,
3818  query_state_,
3819  executor_,
3820  input_to_nest_level,
3821  {join_type},
3822  now_,
3823  just_explain);
3824  CHECK_EQ(size_t(1), aggregate->inputCount());
3825  const auto source = aggregate->getInput(0);
3826  const auto& in_metainfo = source->getOutputMetainfo();
3827  const auto scalar_sources =
3828  synthesize_inputs(aggregate, size_t(0), in_metainfo, input_to_nest_level);
3829  const auto groupby_exprs = translate_groupby_exprs(aggregate, scalar_sources);
3830  const auto target_exprs = translate_targets(
3831  target_exprs_owned_, scalar_sources, groupby_exprs, aggregate, translator);
3832  const auto targets_meta = get_targets_meta(aggregate, target_exprs);
3833  aggregate->setOutputMetainfo(targets_meta);
3834  return {RelAlgExecutionUnit{
3835  input_descs,
3836  input_col_descs,
3837  {},
3838  {},
3839  {},
3840  groupby_exprs,
3841  target_exprs,
3842  nullptr,
3843  sort_info,
3844  0,
3845  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults(),
3846  false,
3847  std::nullopt,
3848  query_state_},
3849  aggregate,
3851  nullptr};
3852 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< std::shared_ptr< Analyzer::Expr > > synthesize_inputs(const RelAlgNode *ra_node, const size_t nest_level, const std::vector< TargetMetaInfo > &in_metainfo, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const Catalog_Namespace::Catalog & cat_
const RelAlgNode * getInput(const size_t idx) const
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
static RegisteredQueryHint defaults()
Definition: QueryHint.h:175
std::shared_ptr< const query_state::QueryState > query_state_
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:102
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
std::unique_ptr< RelAlgDagBuilder > query_dag_
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const size_t inputCount() const
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createCompoundWorkUnit ( const RelCompound compound,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 3529 of file RelAlgExecutor.cpp.

References cat_, CHECK_EQ, RegisteredQueryHint::defaults(), anonymous_namespace{RelAlgExecutor.cpp}::do_table_reordering(), executor_, ExecutionOptions::executor_type, g_default_max_groups_buffer_entry_guess, g_from_table_reordering, anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_left_deep_join_input_sizes(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), RelAlgNode::inputCount(), ExecutionOptions::just_explain, LEFT, anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), now_, shared::printContainer(), query_dag_, query_state_, anonymous_namespace{RelAlgExecutor.cpp}::rewrite_quals(), RelAlgNode::setOutputMetainfo(), RelAlgExecutionUnit::simple_quals, RelCompound::size(), target_exprs_owned_, anonymous_namespace{RelAlgExecutor.cpp}::translate_groupby_exprs(), anonymous_namespace{RelAlgExecutor.cpp}::translate_quals(), anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), anonymous_namespace{RelAlgExecutor.cpp}::translate_targets(), translateLeftDeepJoinFilter(), and VLOG.

Referenced by createWorkUnit(), executeCompound(), executeDelete(), executeUpdate(), and getOuterFragmentCount().

3532  {
3533  std::vector<InputDescriptor> input_descs;
3534  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3535  auto input_to_nest_level = get_input_nest_levels(compound, {});
3536  std::tie(input_descs, input_col_descs, std::ignore) =
3537  get_input_desc(compound, input_to_nest_level, {}, cat_);
3538  VLOG(3) << "input_descs=" << shared::printContainer(input_descs);
3539  const auto query_infos = get_table_infos(input_descs, executor_);
3540  CHECK_EQ(size_t(1), compound->inputCount());
3541  const auto left_deep_join =
3542  dynamic_cast<const RelLeftDeepInnerJoin*>(compound->getInput(0));
3543  JoinQualsPerNestingLevel left_deep_join_quals;
3544  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3545  : std::vector<JoinType>{get_join_type(compound)};
3546  std::vector<size_t> input_permutation;
3547  std::vector<size_t> left_deep_join_input_sizes;
3548  if (left_deep_join) {
3549  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3550  left_deep_join_quals = translateLeftDeepJoinFilter(
3551  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3553  std::find(join_types.begin(), join_types.end(), JoinType::LEFT) ==
3554  join_types.end()) {
3555  input_permutation = do_table_reordering(input_descs,
3556  input_col_descs,
3557  left_deep_join_quals,
3558  input_to_nest_level,
3559  compound,
3560  query_infos,
3561  executor_);
3562  input_to_nest_level = get_input_nest_levels(compound, input_permutation);
3563  std::tie(input_descs, input_col_descs, std::ignore) =
3564  get_input_desc(compound, input_to_nest_level, input_permutation, cat_);
3565  left_deep_join_quals = translateLeftDeepJoinFilter(
3566  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3567  }
3568  }
3569  RelAlgTranslator translator(cat_,
3570  query_state_,
3571  executor_,
3572  input_to_nest_level,
3573  join_types,
3574  now_,
3575  eo.just_explain);
3576  const auto scalar_sources =
3577  translate_scalar_sources(compound, translator, eo.executor_type);
3578  const auto groupby_exprs = translate_groupby_exprs(compound, scalar_sources);
3579  const auto quals_cf = translate_quals(compound, translator);
3580  const auto target_exprs = translate_targets(target_exprs_owned_,
3581  scalar_sources,
3582  groupby_exprs,
3583  compound,
3584  translator,
3585  eo.executor_type);
3586  CHECK_EQ(compound->size(), target_exprs.size());
3587  const RelAlgExecutionUnit exe_unit = {
3588  input_descs,
3589  input_col_descs,
3590  quals_cf.simple_quals,
3591  rewrite_quals(quals_cf.quals),
3592  left_deep_join_quals,
3593  groupby_exprs,
3594  target_exprs,
3595  nullptr,
3596  sort_info,
3597  0,
3598  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults(),
3599  false,
3600  std::nullopt,
3601  query_state_};
3602  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3603  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3604  const auto targets_meta = get_targets_meta(compound, rewritten_exe_unit.target_exprs);
3605  compound->setOutputMetainfo(targets_meta);
3606  return {rewritten_exe_unit,
3607  compound,
3609  std::move(query_rewriter),
3610  input_permutation,
3611  left_deep_join_input_sizes};
3612 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:108
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
size_t size() const override
std::vector< Analyzer::Expr * > translate_targets(std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources, const std::list< std::shared_ptr< Analyzer::Expr >> &groupby_exprs, const RelCompound *compound, const RelAlgTranslator &translator, const ExecutorType executor_type)
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
bool g_from_table_reordering
Definition: Execute.cpp:84
ExecutorType executor_type
std::list< std::shared_ptr< Analyzer::Expr > > translate_groupby_exprs(const RelCompound *compound, const std::vector< std::shared_ptr< Analyzer::Expr >> &scalar_sources)
const Catalog_Namespace::Catalog & cat_
const RelAlgNode * getInput(const size_t idx) const
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
static RegisteredQueryHint defaults()
Definition: QueryHint.h:175
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:102
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
std::unique_ptr< RelAlgDagBuilder > query_dag_
QualsConjunctiveForm translate_quals(const RelCompound *compound, const RelAlgTranslator &translator)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:80
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const size_t inputCount() const
Executor * executor_
#define VLOG(n)
Definition: Logger.h:300
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)
std::list< std::shared_ptr< Analyzer::Expr > > rewrite_quals(const std::list< std::shared_ptr< Analyzer::Expr >> &quals)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createFilterWorkUnit ( const RelFilter filter,
const SortInfo sort_info,
const bool  just_explain 
)
private

Definition at line 4214 of file RelAlgExecutor.cpp.

References cat_, CHECK_EQ, RegisteredQueryHint::defaults(), executor_, fold_expr(), g_default_max_groups_buffer_entry_guess, get_exprs_not_owned(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_inputs_meta(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), RelFilter::getCondition(), RelAlgNode::inputCount(), now_, query_dag_, query_state_, rewrite_expr(), RelAlgNode::setOutputMetainfo(), and target_exprs_owned_.

Referenced by createWorkUnit(), and executeFilter().

4216  {
4217  CHECK_EQ(size_t(1), filter->inputCount());
4218  std::vector<InputDescriptor> input_descs;
4219  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4220  std::vector<TargetMetaInfo> in_metainfo;
4221  std::vector<std::shared_ptr<RexInput>> used_inputs_owned;
4222  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned;
4223 
4224  const auto input_to_nest_level = get_input_nest_levels(filter, {});
4225  std::tie(input_descs, input_col_descs, used_inputs_owned) =
4226  get_input_desc(filter, input_to_nest_level, {}, cat_);
4227  const auto join_type = get_join_type(filter);
4228  RelAlgTranslator translator(cat_,
4229  query_state_,
4230  executor_,
4231  input_to_nest_level,
4232  {join_type},
4233  now_,
4234  just_explain);
4235  std::tie(in_metainfo, target_exprs_owned) =
4236  get_inputs_meta(filter, translator, used_inputs_owned, input_to_nest_level);
4237  const auto filter_expr = translator.translateScalarRex(filter->getCondition());
4238  const auto qual = fold_expr(filter_expr.get());
4239  target_exprs_owned_.insert(
4240  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
4241  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
4242  filter->setOutputMetainfo(in_metainfo);
4243  const auto rewritten_qual = rewrite_expr(qual.get());
4244  return {{input_descs,
4245  input_col_descs,
4246  {},
4247  {rewritten_qual ? rewritten_qual : qual},
4248  {},
4249  {nullptr},
4250  target_exprs,
4251  nullptr,
4252  sort_info,
4253  0,
4254  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults()},
4255  filter,
4257  nullptr};
4258 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::pair< std::vector< TargetMetaInfo >, std::vector< std::shared_ptr< Analyzer::Expr > > > get_inputs_meta(const RelFilter *filter, const RelAlgTranslator &translator, const std::vector< std::shared_ptr< RexInput >> &inputs_owned, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level)
const RexScalar * getCondition() const
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:252
Analyzer::ExpressionPtr rewrite_expr(const Analyzer::Expr *expr)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
const Catalog_Namespace::Catalog & cat_
JoinType get_join_type(const RelAlgNode *ra)
static RegisteredQueryHint defaults()
Definition: QueryHint.h:175
std::shared_ptr< const query_state::QueryState > query_state_
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:102
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
std::unique_ptr< RelAlgDagBuilder > query_dag_
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const size_t inputCount() const
Executor * executor_
std::shared_ptr< Analyzer::Expr > fold_expr(const Analyzer::Expr *expr)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

WorkUnit RelAlgExecutor::createJoinWorkUnit ( const RelJoin ,
const SortInfo ,
const bool  just_explain 
)
private
RelAlgExecutor::WorkUnit RelAlgExecutor::createProjectWorkUnit ( const RelProject project,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 3854 of file RelAlgExecutor.cpp.

References cat_, RegisteredQueryHint::defaults(), anonymous_namespace{RelAlgExecutor.cpp}::do_table_reordering(), executor_, ExecutionOptions::executor_type, g_default_max_groups_buffer_entry_guess, g_from_table_reordering, get_exprs_not_owned(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), anonymous_namespace{RelAlgExecutor.cpp}::get_join_type(), anonymous_namespace{RelAlgExecutor.cpp}::get_left_deep_join_input_sizes(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), ExecutionOptions::just_explain, anonymous_namespace{RelAlgExecutor.cpp}::left_deep_join_types(), now_, query_dag_, query_state_, RelAlgNode::setOutputMetainfo(), target_exprs_owned_, anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), and translateLeftDeepJoinFilter().

Referenced by createWorkUnit(), executeDelete(), executeProject(), executeUpdate(), and getOuterFragmentCount().

3857  {
3858  std::vector<InputDescriptor> input_descs;
3859  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3860  auto input_to_nest_level = get_input_nest_levels(project, {});
3861  std::tie(input_descs, input_col_descs, std::ignore) =
3862  get_input_desc(project, input_to_nest_level, {}, cat_);
3863  const auto query_infos = get_table_infos(input_descs, executor_);
3864 
3865  const auto left_deep_join =
3866  dynamic_cast<const RelLeftDeepInnerJoin*>(project->getInput(0));
3867  JoinQualsPerNestingLevel left_deep_join_quals;
3868  const auto join_types = left_deep_join ? left_deep_join_types(left_deep_join)
3869  : std::vector<JoinType>{get_join_type(project)};
3870  std::vector<size_t> input_permutation;
3871  std::vector<size_t> left_deep_join_input_sizes;
3872  if (left_deep_join) {
3873  left_deep_join_input_sizes = get_left_deep_join_input_sizes(left_deep_join);
3874  const auto query_infos = get_table_infos(input_descs, executor_);
3875  left_deep_join_quals = translateLeftDeepJoinFilter(
3876  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3878  input_permutation = do_table_reordering(input_descs,
3879  input_col_descs,
3880  left_deep_join_quals,
3881  input_to_nest_level,
3882  project,
3883  query_infos,
3884  executor_);
3885  input_to_nest_level = get_input_nest_levels(project, input_permutation);
3886  std::tie(input_descs, input_col_descs, std::ignore) =
3887  get_input_desc(project, input_to_nest_level, input_permutation, cat_);
3888  left_deep_join_quals = translateLeftDeepJoinFilter(
3889  left_deep_join, input_descs, input_to_nest_level, eo.just_explain);
3890  }
3891  }
3892 
3893  RelAlgTranslator translator(cat_,
3894  query_state_,
3895  executor_,
3896  input_to_nest_level,
3897  join_types,
3898  now_,
3899  eo.just_explain);
3900  const auto target_exprs_owned =
3901  translate_scalar_sources(project, translator, eo.executor_type);
3902  target_exprs_owned_.insert(
3903  target_exprs_owned_.end(), target_exprs_owned.begin(), target_exprs_owned.end());
3904  const auto target_exprs = get_exprs_not_owned(target_exprs_owned);
3905 
3906  const RelAlgExecutionUnit exe_unit = {
3907  input_descs,
3908  input_col_descs,
3909  {},
3910  {},
3911  left_deep_join_quals,
3912  {nullptr},
3913  target_exprs,
3914  nullptr,
3915  sort_info,
3916  0,
3917  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults(),
3918  false,
3919  std::nullopt,
3920  query_state_};
3921  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
3922  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
3923  const auto targets_meta = get_targets_meta(project, rewritten_exe_unit.target_exprs);
3924  project->setOutputMetainfo(targets_meta);
3925  return {rewritten_exe_unit,
3926  project,
3928  std::move(query_rewriter),
3929  input_permutation,
3930  left_deep_join_input_sizes};
3931 }
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< JoinType > left_deep_join_types(const RelLeftDeepInnerJoin *left_deep_join)
JoinType
Definition: sqldefs.h:108
std::vector< size_t > do_table_reordering(std::vector< InputDescriptor > &input_descs, std::list< std::shared_ptr< const InputColDescriptor >> &input_col_descs, const JoinQualsPerNestingLevel &left_deep_join_quals, std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const RA *node, const std::vector< InputTableInfo > &query_infos, const Executor *executor)
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:252
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
bool g_from_table_reordering
Definition: Execute.cpp:84
ExecutorType executor_type
const Catalog_Namespace::Catalog & cat_
const RelAlgNode * getInput(const size_t idx) const
JoinType get_join_type(const RelAlgNode *ra)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
static RegisteredQueryHint defaults()
Definition: QueryHint.h:175
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:102
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
std::unique_ptr< RelAlgDagBuilder > query_dag_
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
Executor * executor_
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
std::vector< size_t > get_left_deep_join_input_sizes(const RelLeftDeepInnerJoin *left_deep_join)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createSortInputWorkUnit ( const RelSort sort,
const ExecutionOptions eo 
)
private

Definition at line 2671 of file RelAlgExecutor.cpp.

References CHECK_GT, RelSort::collationCount(), SpeculativeTopNBlacklist::contains(), createWorkUnit(), Default, anonymous_namespace{RelAlgExecutor.cpp}::first_oe_is_desc(), g_default_max_groups_buffer_entry_guess, anonymous_namespace{RelAlgExecutor.cpp}::get_order_entries(), anonymous_namespace{RelAlgExecutor.cpp}::get_scan_limit(), get_target_info(), RelAlgNode::getInput(), RelSort::getLimit(), RelSort::getOffset(), RelAlgExecutionUnit::input_descs, RelAlgNode::setOutputMetainfo(), speculative_topn_blacklist_, SpeculativeTopN, and StreamingTopN.

Referenced by executeRelAlgQuerySingleStep(), and executeSort().

2673  {
2674  const auto source = sort->getInput(0);
2675  const size_t limit = sort->getLimit();
2676  const size_t offset = sort->getOffset();
2677  const size_t scan_limit = sort->collationCount() ? 0 : get_scan_limit(source, limit);
2678  const size_t scan_total_limit =
2679  scan_limit ? get_scan_limit(source, scan_limit + offset) : 0;
2680  size_t max_groups_buffer_entry_guess{
2681  scan_total_limit ? scan_total_limit : g_default_max_groups_buffer_entry_guess};
2683  const auto order_entries = get_order_entries(sort);
2684  SortInfo sort_info{order_entries, sort_algorithm, limit, offset};
2685  auto source_work_unit = createWorkUnit(source, sort_info, eo);
2686  const auto& source_exe_unit = source_work_unit.exe_unit;
2687 
2688  // we do not allow sorting geometry or array types
2689  for (auto order_entry : order_entries) {
2690  CHECK_GT(order_entry.tle_no, 0); // tle_no is a 1-base index
2691  const auto& te = source_exe_unit.target_exprs[order_entry.tle_no - 1];
2692  const auto& ti = get_target_info(te, false);
2693  if (ti.sql_type.is_geometry() || ti.sql_type.is_array()) {
2694  throw std::runtime_error(
2695  "Columns with geometry or array types cannot be used in an ORDER BY clause.");
2696  }
2697  }
2698 
2699  if (source_exe_unit.groupby_exprs.size() == 1) {
2700  if (!source_exe_unit.groupby_exprs.front()) {
2701  sort_algorithm = SortAlgorithm::StreamingTopN;
2702  } else {
2703  if (speculative_topn_blacklist_.contains(source_exe_unit.groupby_exprs.front(),
2704  first_oe_is_desc(order_entries))) {
2705  sort_algorithm = SortAlgorithm::Default;
2706  }
2707  }
2708  }
2709 
2710  sort->setOutputMetainfo(source->getOutputMetainfo());
2711  // NB: the `body` field of the returned `WorkUnit` needs to be the `source` node,
2712  // not the `sort`. The aggregator needs the pre-sorted result from leaves.
2713  return {RelAlgExecutionUnit{source_exe_unit.input_descs,
2714  std::move(source_exe_unit.input_col_descs),
2715  source_exe_unit.simple_quals,
2716  source_exe_unit.quals,
2717  source_exe_unit.join_quals,
2718  source_exe_unit.groupby_exprs,
2719  source_exe_unit.target_exprs,
2720  nullptr,
2721  {sort_info.order_entries, sort_algorithm, limit, offset},
2722  scan_total_limit,
2723  source_exe_unit.query_hint,
2724  source_exe_unit.use_bump_allocator,
2725  source_exe_unit.union_all,
2726  source_exe_unit.query_state},
2727  source,
2728  max_groups_buffer_entry_guess,
2729  std::move(source_work_unit.query_rewriter),
2730  source_work_unit.input_permutation,
2731  source_work_unit.left_deep_join_input_sizes};
2732 }
size_t getOffset() const
bool contains(const std::shared_ptr< Analyzer::Expr > expr, const bool desc) const
size_t get_scan_limit(const RelAlgNode *ra, const size_t limit)
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:79
static SpeculativeTopNBlacklist speculative_topn_blacklist_
std::vector< InputDescriptor > input_descs
#define CHECK_GT(x, y)
Definition: Logger.h:218
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
SortAlgorithm
const RelAlgNode * getInput(const size_t idx) const
size_t collationCount() const
size_t getLimit() const
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:102
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::TableFunctionWorkUnit RelAlgExecutor::createTableFunctionWorkUnit ( const RelTableFunction table_func,
const bool  just_explain,
const bool  is_gpu 
)
private

Definition at line 4047 of file RelAlgExecutor.cpp.

References bind_table_function(), cat_, CHECK, CHECK_EQ, CHECK_GT, RelTableFunction::countRexLiteralArgs(), test_fsi::d, DEFAULT_ROW_MULTIPLIER_VALUE, executor_, get_exprs_not_owned(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelTableFunction::getColInputsSize(), RelTableFunction::getFunctionName(), RelTableFunction::getTableFuncInputAt(), i, kINT, LOG, Native, now_, query_state_, RelAlgNode::setOutputMetainfo(), TableFunctionExecutionUnit::target_exprs, target_exprs_owned_, to_string(), anonymous_namespace{RelAlgExecutor.cpp}::translate_scalar_sources(), UNREACHABLE, and logger::WARNING.

Referenced by executeTableFunction().

4050  {
4051  std::vector<InputDescriptor> input_descs;
4052  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
4053  auto input_to_nest_level = get_input_nest_levels(rel_table_func, {});
4054  std::tie(input_descs, input_col_descs, std::ignore) =
4055  get_input_desc(rel_table_func, input_to_nest_level, {}, cat_);
4056  const auto query_infos = get_table_infos(input_descs, executor_);
4057  RelAlgTranslator translator(
4058  cat_, query_state_, executor_, input_to_nest_level, {}, now_, just_explain);
4059  const auto input_exprs_owned =
4060  translate_scalar_sources(rel_table_func, translator, ::ExecutorType::Native);
4061  target_exprs_owned_.insert(
4062  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
4063  auto input_exprs = get_exprs_not_owned(input_exprs_owned);
4064 
4065  const auto table_function_impl_and_type_infos = [=]() {
4066  if (is_gpu) {
4067  try {
4068  return bind_table_function(
4069  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4070  } catch (ExtensionFunctionBindingError& e) {
4071  LOG(WARNING) << "createTableFunctionWorkUnit[GPU]: " << e.what()
4072  << " Redirecting " << rel_table_func->getFunctionName()
4073  << " to run on CPU.";
4074  throw QueryMustRunOnCpu();
4075  }
4076  } else {
4077  try {
4078  return bind_table_function(
4079  rel_table_func->getFunctionName(), input_exprs_owned, is_gpu);
4080  } catch (ExtensionFunctionBindingError& e) {
4081  LOG(WARNING) << "createTableFunctionWorkUnit[CPU]: " << e.what();
4082  throw;
4083  }
4084  }
4085  }();
4086  const auto& table_function_impl = std::get<0>(table_function_impl_and_type_infos);
4087  const auto& table_function_type_infos = std::get<1>(table_function_impl_and_type_infos);
4088 
4089  size_t output_row_sizing_param = 0;
4090  if (table_function_impl.hasUserSpecifiedOutputSizeMultiplier() ||
4091  table_function_impl.hasUserSpecifiedOutputSizeConstant()) {
4092  const auto parameter_index =
4093  table_function_impl.getOutputRowSizeParameter(table_function_type_infos);
4094  CHECK_GT(parameter_index, size_t(0));
4095  if (rel_table_func->countRexLiteralArgs() == table_function_impl.countScalarArgs()) {
4096  const auto parameter_expr =
4097  rel_table_func->getTableFuncInputAt(parameter_index - 1);
4098  const auto parameter_expr_literal = dynamic_cast<const RexLiteral*>(parameter_expr);
4099  if (!parameter_expr_literal) {
4100  throw std::runtime_error(
4101  "Provided output buffer sizing parameter is not a literal. Only literal "
4102  "values are supported with output buffer sizing configured table "
4103  "functions.");
4104  }
4105  int64_t literal_val = parameter_expr_literal->getVal<int64_t>();
4106  if (literal_val < 0) {
4107  throw std::runtime_error("Provided output sizing parameter " +
4108  std::to_string(literal_val) +
4109  " must be positive integer.");
4110  }
4111  output_row_sizing_param = static_cast<size_t>(literal_val);
4112  } else {
4113  // RowMultiplier not specified in the SQL query. Set it to 1
4114  output_row_sizing_param = 1; // default value for RowMultiplier
4116  static auto DEFAULT_ROW_MULTIPLIER_EXPR =
4117  makeExpr<Analyzer::Constant>(kINT, false, d);
4118  // Push the constant 1 to input_exprs
4119  input_exprs.insert(input_exprs.begin() + parameter_index - 1,
4120  DEFAULT_ROW_MULTIPLIER_EXPR.get());
4121  }
4122  } else if (table_function_impl.hasNonUserSpecifiedOutputSizeConstant()) {
4123  output_row_sizing_param = table_function_impl.getOutputRowSizeParameter();
4124  } else {
4125  UNREACHABLE();
4126  }
4127 
4128  std::vector<Analyzer::ColumnVar*> input_col_exprs;
4129  size_t input_index = 0;
4130  for (const auto& ti : table_function_type_infos) {
4131  if (ti.is_column_list()) {
4132  for (int i = 0; i < ti.get_dimension(); i++) {
4133  auto& input_expr = input_exprs[input_index];
4134  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
4135  CHECK(col_var);
4136  input_expr->set_type_info(
4137  ti); // ti is shared in between all columns in the same column_list
4138  input_col_exprs.push_back(col_var);
4139  input_index++;
4140  }
4141  } else if (ti.is_column()) {
4142  auto& input_expr = input_exprs[input_index];
4143  auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr);
4144  CHECK(col_var);
4145  input_expr->set_type_info(ti);
4146  input_col_exprs.push_back(col_var);
4147  input_index++;
4148  } else {
4149  input_index++;
4150  }
4151  }
4152  CHECK_EQ(input_col_exprs.size(), rel_table_func->getColInputsSize());
4153  std::vector<Analyzer::Expr*> table_func_outputs;
4154  for (size_t i = 0; i < table_function_impl.getOutputsSize(); i++) {
4155  const auto ti = table_function_impl.getOutputSQLType(i);
4156  target_exprs_owned_.push_back(std::make_shared<Analyzer::ColumnVar>(ti, 0, i, -1));
4157  table_func_outputs.push_back(target_exprs_owned_.back().get());
4158  }
4159  const TableFunctionExecutionUnit exe_unit = {
4160  input_descs,
4161  input_col_descs,
4162  input_exprs, // table function inputs
4163  input_col_exprs, // table function column inputs (duplicates w/ above)
4164  table_func_outputs, // table function projected exprs
4165  output_row_sizing_param, // output buffer sizing param
4166  table_function_impl};
4167  const auto targets_meta = get_targets_meta(rel_table_func, exe_unit.target_exprs);
4168  rel_table_func->setOutputMetainfo(targets_meta);
4169  return {exe_unit, rel_table_func};
4170 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
tuple d
Definition: test_fsi.py:9
#define LOG(tag)
Definition: Logger.h:200
#define UNREACHABLE()
Definition: Logger.h:250
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:252
#define CHECK_GT(x, y)
Definition: Logger.h:218
std::string to_string(char const *&&v)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
const Catalog_Namespace::Catalog & cat_
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
#define DEFAULT_ROW_MULTIPLIER_VALUE
std::shared_ptr< const query_state::QueryState > query_state_
std::vector< std::shared_ptr< Analyzer::Expr > > translate_scalar_sources(const RA *ra_node, const RelAlgTranslator &translator, const ::ExecutorType executor_type)
const std::tuple< table_functions::TableFunction, std::vector< SQLTypeInfo > > bind_table_function(std::string name, Analyzer::ExpressionPtrVector input_args, const std::vector< table_functions::TableFunction > &table_funcs, const bool is_gpu)
#define CHECK(condition)
Definition: Logger.h:206
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqltypes.h:44
std::vector< Analyzer::Expr * > target_exprs
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createUnionWorkUnit ( const RelLogicalUnion logical_union,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 3951 of file RelAlgExecutor.cpp.

References gpu_enabled::accumulate(), cat_, CHECK, RegisteredQueryHint::defaults(), executor_, g_default_max_groups_buffer_entry_guess, get_exprs_not_owned(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_desc(), anonymous_namespace{RelAlgExecutor.cpp}::get_input_nest_levels(), get_table_infos(), anonymous_namespace{RelAlgExecutor.cpp}::get_targets_meta(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelLogicalUnion::isAll(), ExecutionOptions::just_explain, now_, shared::printContainer(), query_dag_, query_state_, RelAlgNode::setOutputMetainfo(), anonymous_namespace{RelAlgExecutor.cpp}::target_exprs_for_union(), target_exprs_owned_, RelAlgNode::toString(), and VLOG.

Referenced by executeUnion().

3954  {
3955  std::vector<InputDescriptor> input_descs;
3956  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
3957  // Map ra input ptr to index (0, 1).
3958  auto input_to_nest_level = get_input_nest_levels(logical_union, {});
3959  std::tie(input_descs, input_col_descs, std::ignore) =
3960  get_input_desc(logical_union, input_to_nest_level, {}, cat_);
3961  const auto query_infos = get_table_infos(input_descs, executor_);
3962  auto const max_num_tuples =
3963  std::accumulate(query_infos.cbegin(),
3964  query_infos.cend(),
3965  size_t(0),
3966  [](auto max, auto const& query_info) {
3967  return std::max(max, query_info.info.getNumTuples());
3968  });
3969 
3970  VLOG(3) << "input_to_nest_level.size()=" << input_to_nest_level.size() << " Pairs are:";
3971  for (auto& pair : input_to_nest_level) {
3972  VLOG(3) << " (" << pair.first->toString() << ", " << pair.second << ')';
3973  }
3974 
3975  RelAlgTranslator translator(
3976  cat_, query_state_, executor_, input_to_nest_level, {}, now_, eo.just_explain);
3977 
3978  auto const input_exprs_owned = target_exprs_for_union(logical_union->getInput(0));
3979  CHECK(!input_exprs_owned.empty())
3980  << "No metainfo found for input node " << logical_union->getInput(0)->toString();
3981  VLOG(3) << "input_exprs_owned.size()=" << input_exprs_owned.size();
3982  for (auto& input_expr : input_exprs_owned) {
3983  VLOG(3) << " " << input_expr->toString();
3984  }
3985  target_exprs_owned_.insert(
3986  target_exprs_owned_.end(), input_exprs_owned.begin(), input_exprs_owned.end());
3987  const auto target_exprs = get_exprs_not_owned(input_exprs_owned);
3988 
3989  VLOG(3) << "input_descs=" << shared::printContainer(input_descs)
3990  << " input_col_descs=" << shared::printContainer(input_col_descs)
3991  << " target_exprs.size()=" << target_exprs.size()
3992  << " max_num_tuples=" << max_num_tuples;
3993 
3994  const RelAlgExecutionUnit exe_unit = {
3995  input_descs,
3996  input_col_descs,
3997  {}, // quals_cf.simple_quals,
3998  {}, // rewrite_quals(quals_cf.quals),
3999  {},
4000  {nullptr},
4001  target_exprs,
4002  nullptr,
4003  sort_info,
4004  max_num_tuples,
4005  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults(),
4006  false,
4007  logical_union->isAll(),
4008  query_state_};
4009  auto query_rewriter = std::make_unique<QueryRewriter>(query_infos, executor_);
4010  const auto rewritten_exe_unit = query_rewriter->rewrite(exe_unit);
4011 
4012  RelAlgNode const* input0 = logical_union->getInput(0);
4013  if (auto const* node = dynamic_cast<const RelCompound*>(input0)) {
4014  logical_union->setOutputMetainfo(
4015  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4016  } else if (auto const* node = dynamic_cast<const RelProject*>(input0)) {
4017  logical_union->setOutputMetainfo(
4018  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4019  } else if (auto const* node = dynamic_cast<const RelLogicalUnion*>(input0)) {
4020  logical_union->setOutputMetainfo(
4021  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4022  } else if (auto const* node = dynamic_cast<const RelAggregate*>(input0)) {
4023  logical_union->setOutputMetainfo(
4024  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4025  } else if (auto const* node = dynamic_cast<const RelScan*>(input0)) {
4026  logical_union->setOutputMetainfo(
4027  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4028  } else if (auto const* node = dynamic_cast<const RelFilter*>(input0)) {
4029  logical_union->setOutputMetainfo(
4030  get_targets_meta(node, rewritten_exe_unit.target_exprs));
4031  } else if (dynamic_cast<const RelSort*>(input0)) {
4032  throw QueryNotSupported("LIMIT and OFFSET are not currently supported with UNION.");
4033  } else {
4034  throw QueryNotSupported("Unsupported input type: " + input0->toString());
4035  }
4036  VLOG(3) << "logical_union->getOutputMetainfo()="
4037  << shared::printContainer(logical_union->getOutputMetainfo())
4038  << " rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId()="
4039  << rewritten_exe_unit.input_col_descs.front()->getScanDesc().getTableId();
4040 
4041  return {rewritten_exe_unit,
4042  logical_union,
4044  std::move(query_rewriter)};
4045 }
bool isAll() const
std::unordered_map< const RelAlgNode *, int > get_input_nest_levels(const RelAlgNode *ra_node, const std::vector< size_t > &input_permutation)
std::vector< Analyzer::Expr * > get_exprs_not_owned(const std::vector< std::shared_ptr< Analyzer::Expr >> &exprs)
Definition: Execute.h:252
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
const Catalog_Namespace::Catalog & cat_
const RelAlgNode * getInput(const size_t idx) const
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_for_union(RelAlgNode const *input_node)
std::vector< TargetMetaInfo > get_targets_meta(const RA *ra_node, const std::vector< Analyzer::Expr * > &target_exprs)
static RegisteredQueryHint defaults()
Definition: QueryHint.h:175
std::shared_ptr< const query_state::QueryState > query_state_
size_t g_default_max_groups_buffer_entry_guess
Definition: Execute.cpp:102
virtual std::string toString() const =0
#define CHECK(condition)
Definition: Logger.h:206
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
std::unique_ptr< RelAlgDagBuilder > query_dag_
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:80
std::tuple< std::vector< InputDescriptor >, std::list< std::shared_ptr< const InputColDescriptor > >, std::vector< std::shared_ptr< RexInput > > > get_input_desc(const RA *ra_node, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const std::vector< size_t > &input_permutation, const Catalog_Namespace::Catalog &cat)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
Executor * executor_
#define VLOG(n)
Definition: Logger.h:300

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::unique_ptr< WindowFunctionContext > RelAlgExecutor::createWindowFunctionContext ( const Analyzer::WindowFunction window_func,
const std::shared_ptr< Analyzer::BinOper > &  partition_key_cond,
const RelAlgExecutionUnit ra_exe_unit,
const std::vector< InputTableInfo > &  query_infos,
const CompilationOptions co,
ColumnCacheMap column_cache_map,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner 
)
private

Definition at line 1935 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, Data_Namespace::CPU_LEVEL, CompilationOptions::device_type, executor_, ColumnFetcher::getOneColumnFragment(), Analyzer::WindowFunction::getOrderKeys(), GPU, Data_Namespace::GPU_LEVEL, INVALID, OneToMany, and RelAlgExecutionUnit::query_hint.

Referenced by computeWindow().

1942  {
1943  const auto memory_level = co.device_type == ExecutorDeviceType::GPU
1946  const auto join_table_or_err =
1947  executor_->buildHashTableForQualifier(partition_key_cond,
1948  query_infos,
1949  memory_level,
1950  JoinType::INVALID, // for window function
1952  column_cache_map,
1953  ra_exe_unit.query_hint);
1954  if (!join_table_or_err.fail_reason.empty()) {
1955  throw std::runtime_error(join_table_or_err.fail_reason);
1956  }
1957  CHECK(join_table_or_err.hash_table->getHashType() == HashType::OneToMany);
1958  const auto& order_keys = window_func->getOrderKeys();
1959  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
1960  const size_t elem_count = query_infos.front().info.fragments.front().getNumTuples();
1961  auto context = std::make_unique<WindowFunctionContext>(window_func,
1962  join_table_or_err.hash_table,
1963  elem_count,
1964  co.device_type,
1965  row_set_mem_owner);
1966  for (const auto& order_key : order_keys) {
1967  const auto order_col =
1968  std::dynamic_pointer_cast<const Analyzer::ColumnVar>(order_key);
1969  if (!order_col) {
1970  throw std::runtime_error("Only order by columns supported for now");
1971  }
1972  const int8_t* column;
1973  size_t join_col_elem_count;
1974  std::tie(column, join_col_elem_count) =
1976  *order_col,
1977  query_infos.front().info.fragments.front(),
1978  memory_level,
1979  0,
1980  nullptr,
1981  /*thread_idx=*/0,
1982  chunks_owner,
1983  column_cache_map);
1984  CHECK_EQ(join_col_elem_count, elem_count);
1985  context->addOrderColumn(column, order_col.get(), chunks_owner);
1986  }
1987  return context;
1988 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1455
ExecutorDeviceType device_type
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
RegisteredQueryHint query_hint
#define CHECK(condition)
Definition: Logger.h:206
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutor::WorkUnit RelAlgExecutor::createWorkUnit ( const RelAlgNode node,
const SortInfo sort_info,
const ExecutionOptions eo 
)
private

Definition at line 3357 of file RelAlgExecutor.cpp.

References createAggregateWorkUnit(), createCompoundWorkUnit(), createFilterWorkUnit(), createProjectWorkUnit(), logger::FATAL, ExecutionOptions::just_explain, LOG, and RelAlgNode::toString().

Referenced by createSortInputWorkUnit().

3359  {
3360  const auto compound = dynamic_cast<const RelCompound*>(node);
3361  if (compound) {
3362  return createCompoundWorkUnit(compound, sort_info, eo);
3363  }
3364  const auto project = dynamic_cast<const RelProject*>(node);
3365  if (project) {
3366  return createProjectWorkUnit(project, sort_info, eo);
3367  }
3368  const auto aggregate = dynamic_cast<const RelAggregate*>(node);
3369  if (aggregate) {
3370  return createAggregateWorkUnit(aggregate, sort_info, eo.just_explain);
3371  }
3372  const auto filter = dynamic_cast<const RelFilter*>(node);
3373  if (filter) {
3374  return createFilterWorkUnit(filter, sort_info, eo.just_explain);
3375  }
3376  LOG(FATAL) << "Unhandled node type: " << node->toString();
3377  return {};
3378 }
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
#define LOG(tag)
Definition: Logger.h:200
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
virtual std::string toString() const =0
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::eraseFromTemporaryTables ( const int  table_id)
inlineprivate

Definition at line 342 of file RelAlgExecutor.h.

References temporary_tables_.

342 { temporary_tables_.erase(table_id); }
TemporaryTables temporary_tables_
ExecutionResult RelAlgExecutor::executeAggregate ( const RelAggregate aggregate,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 1745 of file RelAlgExecutor.cpp.

References createAggregateWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

1749  {
1750  auto timer = DEBUG_TIMER(__func__);
1751  const auto work_unit = createAggregateWorkUnit(
1752  aggregate, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1753  return executeWorkUnit(work_unit,
1754  aggregate->getOutputMetainfo(),
1755  true,
1756  co,
1757  eo,
1758  render_info,
1759  queue_time_ms);
1760 }
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
#define DEBUG_TIMER(name)
Definition: Logger.h:322
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeCompound ( const RelCompound compound,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 1727 of file RelAlgExecutor.cpp.

References createCompoundWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and RelCompound::isAggregate().

Referenced by executeRelAlgStep().

1731  {
1732  auto timer = DEBUG_TIMER(__func__);
1733  const auto work_unit =
1734  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo);
1735  CompilationOptions co_compound = co;
1736  return executeWorkUnit(work_unit,
1737  compound->getOutputMetainfo(),
1738  compound->isAggregate(),
1739  co_compound,
1740  eo,
1741  render_info,
1742  queue_time_ms);
1743 }
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
bool isAggregate() const
#define DEBUG_TIMER(name)
Definition: Logger.h:322
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeDelete ( const RelAlgNode node,
const CompilationOptions co,
const ExecutionOptions eo_in,
const int64_t  queue_time_ms 
)
private

Definition at line 1627 of file RelAlgExecutor.cpp.

References cat_, CHECK, CHECK_EQ, createCompoundWorkUnit(), createProjectWorkUnit(), DEBUG_TIMER, Default, dml_transaction_parameters_, executor_, CompilationOptions::filter_on_deleted_column, get_table_infos(), get_temporary_table(), Catalog_Namespace::Catalog::getDeletedColumn(), QueryExecutionError::getErrorCode(), getErrorMessageFromCode(), CacheInvalidator< CACHE_HOLDING_TYPES >::invalidateCaches(), CompilationOptions::makeCpuOnly(), ExecutionOptions::output_columnar_hint, post_execution_callback_, table_is_temporary(), temporary_tables_, and StorageIOFacility::yieldDeleteCallback().

Referenced by executeRelAlgStep().

1630  {
1631  CHECK(node);
1632  auto timer = DEBUG_TIMER(__func__);
1633 
1635 
1636  auto execute_delete_for_node = [this, &co, &eo_in](const auto node,
1637  auto& work_unit,
1638  const bool is_aggregate) {
1639  auto* table_descriptor = node->getModifiedTableDescriptor();
1640  CHECK(table_descriptor);
1641  if (!table_descriptor->hasDeletedCol) {
1642  throw std::runtime_error(
1643  "DELETE queries are only supported on tables with the vacuum attribute set to "
1644  "'delayed'");
1645  }
1646 
1647  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1648 
1649  auto execute_delete_ra_exe_unit =
1650  [this, &table_infos, &table_descriptor, &eo_in, &co](const auto& exe_unit,
1651  const bool is_aggregate) {
1653  std::make_unique<DeleteTransactionParameters>(table_descriptor);
1654  auto delete_params = dynamic_cast<DeleteTransactionParameters*>(
1656  CHECK(delete_params);
1657  auto delete_callback = yieldDeleteCallback(*delete_params);
1659 
1660  auto eo = eo_in;
1661  if (dml_transaction_parameters_->tableIsTemporary()) {
1662  eo.output_columnar_hint = true;
1663  co_delete.filter_on_deleted_column =
1664  false; // project the entire delete column for columnar update
1665  } else {
1666  CHECK_EQ(exe_unit.target_exprs.size(), size_t(1));
1667  }
1668 
1669  try {
1670  auto table_update_metadata =
1671  executor_->executeUpdate(exe_unit,
1672  table_infos,
1673  co_delete,
1674  eo,
1675  cat_,
1676  executor_->row_set_mem_owner_,
1677  delete_callback,
1678  is_aggregate);
1679  post_execution_callback_ = [table_update_metadata, this]() {
1680  dml_transaction_parameters_->finalizeTransaction(cat_);
1681  TableOptimizer table_optimizer{
1682  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
1683  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
1684  };
1685  } catch (const QueryExecutionError& e) {
1686  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
1687  }
1688  };
1689 
1690  if (table_is_temporary(table_descriptor)) {
1691  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1692  auto cd = cat_.getDeletedColumn(table_descriptor);
1693  CHECK(cd);
1694  auto delete_column_expr = makeExpr<Analyzer::ColumnVar>(
1695  cd->columnType, table_descriptor->tableId, cd->columnId, 0);
1696  const auto rewritten_exe_unit =
1697  query_rewrite->rewriteColumnarDelete(work_unit.exe_unit, delete_column_expr);
1698  execute_delete_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1699  } else {
1700  execute_delete_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1701  }
1702  };
1703 
1704  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1705  const auto work_unit =
1706  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1707  execute_delete_for_node(compound, work_unit, compound->isAggregate());
1708  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1709  auto work_unit =
1710  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1711  if (project->isSimple()) {
1712  CHECK_EQ(size_t(1), project->inputCount());
1713  const auto input_ra = project->getInput(0);
1714  if (dynamic_cast<const RelSort*>(input_ra)) {
1715  const auto& input_table =
1716  get_temporary_table(&temporary_tables_, -input_ra->getId());
1717  CHECK(input_table);
1718  work_unit.exe_unit.scan_limit = input_table->rowCount();
1719  }
1720  }
1721  execute_delete_for_node(project, work_unit, false);
1722  } else {
1723  throw std::runtime_error("Unsupported parent node for delete: " + node->toString());
1724  }
1725 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
std::optional< std::function< void()> > post_execution_callback_
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:3126
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
TemporaryTables temporary_tables_
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
static void invalidateCaches()
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:229
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
const Catalog_Namespace::Catalog & cat_
bool table_is_temporary(const TableDescriptor *const td)
#define CHECK(condition)
Definition: Logger.h:206
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:322
static std::string getErrorMessageFromCode(const int32_t error_code)
std::unique_ptr< TransactionParameters > dml_transaction_parameters_
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeFilter ( const RelFilter filter,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 1990 of file RelAlgExecutor.cpp.

References createFilterWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelAlgNode::getOutputMetainfo(), and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

1994  {
1995  auto timer = DEBUG_TIMER(__func__);
1996  const auto work_unit =
1997  createFilterWorkUnit(filter, {{}, SortAlgorithm::Default, 0, 0}, eo.just_explain);
1998  return executeWorkUnit(
1999  work_unit, filter->getOutputMetainfo(), false, co, eo, render_info, queue_time_ms);
2000 }
#define DEBUG_TIMER(name)
Definition: Logger.h:322
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeLogicalValues ( const RelLogicalValues logical_values,
const ExecutionOptions eo 
)
private

Definition at line 2055 of file RelAlgExecutor.cpp.

References CPU, DEBUG_TIMER, executor_, RelLogicalValues::getNumRows(), RelLogicalValues::getTupleType(), i, kBIGINT, kCOUNT, kNULLT, Projection, query_mem_desc, and RelAlgNode::setOutputMetainfo().

Referenced by executeRelAlgStep().

2057  {
2058  auto timer = DEBUG_TIMER(__func__);
2060  logical_values->getNumRows(),
2062  /*is_table_function=*/false);
2063 
2064  auto tuple_type = logical_values->getTupleType();
2065  for (size_t i = 0; i < tuple_type.size(); ++i) {
2066  auto& target_meta_info = tuple_type[i];
2067  if (target_meta_info.get_type_info().is_varlen()) {
2068  throw std::runtime_error("Variable length types not supported in VALUES yet.");
2069  }
2070  if (target_meta_info.get_type_info().get_type() == kNULLT) {
2071  // replace w/ bigint
2072  tuple_type[i] =
2073  TargetMetaInfo(target_meta_info.get_resname(), SQLTypeInfo(kBIGINT, false));
2074  }
2075  query_mem_desc.addColSlotInfo(
2076  {std::make_tuple(tuple_type[i].get_type_info().get_size(), 8)});
2077  }
2078  logical_values->setOutputMetainfo(tuple_type);
2079 
2080  std::vector<TargetInfo> target_infos;
2081  for (const auto& tuple_type_component : tuple_type) {
2082  target_infos.emplace_back(TargetInfo{false,
2083  kCOUNT,
2084  tuple_type_component.get_type_info(),
2085  SQLTypeInfo(kNULLT, false),
2086  false,
2087  false});
2088  }
2089 
2090  std::shared_ptr<ResultSet> rs{
2091  ResultSetLogicalValuesBuilder{logical_values,
2092  target_infos,
2095  executor_->getRowSetMemoryOwner(),
2096  executor_}
2097  .build()};
2098 
2099  return {rs, tuple_type};
2100 }
size_t getNumRows() const
const std::vector< TargetMetaInfo > getTupleType() const
Definition: sqldefs.h:76
#define DEBUG_TIMER(name)
Definition: Logger.h:322
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeModify ( const RelModify modify,
const ExecutionOptions eo 
)
private

Definition at line 2150 of file RelAlgExecutor.cpp.

References CPU, DEBUG_TIMER, executor_, and ExecutionOptions::just_explain.

Referenced by executeRelAlgStep().

2151  {
2152  auto timer = DEBUG_TIMER(__func__);
2153  if (eo.just_explain) {
2154  throw std::runtime_error("EXPLAIN not supported for ModifyTable");
2155  }
2156 
2157  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2160  executor_->getRowSetMemoryOwner(),
2161  executor_->getCatalog(),
2162  executor_->blockSize(),
2163  executor_->gridSize());
2164 
2165  std::vector<TargetMetaInfo> empty_targets;
2166  return {rs, empty_targets};
2167 }
std::vector< TargetInfo > TargetInfoList
#define DEBUG_TIMER(name)
Definition: Logger.h:322
Executor * executor_

+ Here is the caller graph for this function:

void RelAlgExecutor::executePostExecutionCallback ( )

Definition at line 3350 of file RelAlgExecutor.cpp.

References post_execution_callback_, and VLOG.

3350  {
3352  VLOG(1) << "Running post execution callback.";
3353  (*post_execution_callback_)();
3354  }
3355 }
std::optional< std::function< void()> > post_execution_callback_
#define VLOG(n)
Definition: Logger.h:300
ExecutionResult RelAlgExecutor::executeProject ( const RelProject project,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const std::optional< size_t >  previous_count 
)
private

Definition at line 1775 of file RelAlgExecutor.cpp.

References CHECK, CHECK_EQ, CPU, createProjectWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), get_temporary_table(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelAlgNode::inputCount(), RelProject::isSimple(), and temporary_tables_.

Referenced by executeRelAlgStep().

1781  {
1782  auto timer = DEBUG_TIMER(__func__);
1783  auto work_unit = createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo);
1784  CompilationOptions co_project = co;
1785  if (project->isSimple()) {
1786  CHECK_EQ(size_t(1), project->inputCount());
1787  const auto input_ra = project->getInput(0);
1788  if (dynamic_cast<const RelSort*>(input_ra)) {
1789  co_project.device_type = ExecutorDeviceType::CPU;
1790  const auto& input_table =
1791  get_temporary_table(&temporary_tables_, -input_ra->getId());
1792  CHECK(input_table);
1793  work_unit.exe_unit.scan_limit =
1794  std::min(input_table->getLimit(), input_table->rowCount());
1795  }
1796  }
1797  return executeWorkUnit(work_unit,
1798  project->getOutputMetainfo(),
1799  false,
1800  co_project,
1801  eo,
1802  render_info,
1803  queue_time_ms,
1804  previous_count);
1805 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
TemporaryTables temporary_tables_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:229
const RelAlgNode * getInput(const size_t idx) const
bool isSimple() const
#define CHECK(condition)
Definition: Logger.h:206
#define DEBUG_TIMER(name)
Definition: Logger.h:322
const size_t inputCount() const
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQuery ( const CompilationOptions co,
const ExecutionOptions eo,
const bool  just_explain_plan,
RenderInfo render_info 
)

Definition at line 236 of file RelAlgExecutor.cpp.

References CHECK, DEBUG_TIMER, executeRelAlgQueryNoRetry(), g_allow_cpu_retry, logger::INFO, INJECT_TIMER, LOG, CompilationOptions::makeCpuOnly(), post_execution_callback_, query_dag_, run_benchmark::run_query(), RenderInfo::setForceNonInSituData(), and VLOG.

239  {
240  CHECK(query_dag_);
241  auto timer = DEBUG_TIMER(__func__);
243 
244  auto run_query = [&](const CompilationOptions& co_in) {
245  auto execution_result =
246  executeRelAlgQueryNoRetry(co_in, eo, just_explain_plan, render_info);
248  VLOG(1) << "Running post execution callback.";
249  (*post_execution_callback_)();
250  }
251  return execution_result;
252  };
253 
254  try {
255  return run_query(co);
256  } catch (const QueryMustRunOnCpu&) {
257  if (!g_allow_cpu_retry) {
258  throw;
259  }
260  }
261  LOG(INFO) << "Query unable to run in GPU mode, retrying on CPU";
262  auto co_cpu = CompilationOptions::makeCpuOnly(co);
263 
264  if (render_info) {
265  render_info->setForceNonInSituData();
266  }
267  return run_query(co_cpu);
268 }
std::optional< std::function< void()> > post_execution_callback_
void setForceNonInSituData()
Definition: RenderInfo.cpp:45
#define LOG(tag)
Definition: Logger.h:200
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
ExecutionResult executeRelAlgQuery(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
#define CHECK(condition)
Definition: Logger.h:206
#define DEBUG_TIMER(name)
Definition: Logger.h:322
std::unique_ptr< RelAlgDagBuilder > query_dag_
bool g_allow_cpu_retry
Definition: Execute.cpp:81
#define VLOG(n)
Definition: Logger.h:300

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQueryNoRetry ( const CompilationOptions co,
const ExecutionOptions eo,
const bool  just_explain_plan,
RenderInfo render_info 
)
private

Definition at line 270 of file RelAlgExecutor.cpp.

References cat_, CHECK, cleanupPostExecution(), DEBUG_TIMER, Executor::ERR_INTERRUPTED, executeRelAlgQueryWithFilterPushDown(), executeRelAlgSeq(), executor_, g_enable_dynamic_watchdog, g_pending_query_interrupt_freq, get_physical_inputs(), get_physical_table_inputs(), QueryExecutionError::getErrorCode(), getSubqueries(), i, INJECT_TIMER, join(), ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, anonymous_namespace{RelAlgExecutor.cpp}::prepare_foreign_table_for_execution(), query_dag_, query_state_, run_benchmark_import::result, gpu_enabled::reverse(), gpu_enabled::sort(), timer_start(), timer_stop(), and to_string().

Referenced by executeRelAlgQuery().

273  {
275  auto timer = DEBUG_TIMER(__func__);
276  auto timer_setup = DEBUG_TIMER("Query pre-execution steps");
277 
278  query_dag_->resetQueryExecutionState();
279  const auto& ra = query_dag_->getRootNode();
280 
281  // capture the lock acquistion time
282  auto clock_begin = timer_start();
284  executor_->resetInterrupt();
285  }
286  std::string query_session{""};
287  std::string query_str{"N/A"};
288  std::string query_submitted_time{""};
289  // gather necessary query's info
290  if (query_state_ != nullptr && query_state_->getConstSessionInfo() != nullptr) {
291  query_session = query_state_->getConstSessionInfo()->get_session_id();
292  query_str = query_state_->getQueryStr();
293  query_submitted_time = query_state_->getQuerySubmittedTime();
294  }
295 
296  bool acquire_spin_lock = false;
297  auto validate_or_explain_query =
298  just_explain_plan || eo.just_validate || eo.just_explain || eo.just_calcite_explain;
299  ScopeGuard clearRuntimeInterruptStatus = [this,
300  &query_session,
301  &render_info,
302  &eo,
303  &acquire_spin_lock,
304  &query_submitted_time,
305  &validate_or_explain_query] {
306  // reset the runtime query interrupt status after the end of query execution
307  if (!render_info && !query_session.empty() && eo.allow_runtime_query_interrupt &&
308  !validate_or_explain_query) {
309  executor_->clearQuerySessionStatus(
310  query_session, query_submitted_time, acquire_spin_lock);
311  }
312  };
313 
314  if (!render_info && eo.allow_runtime_query_interrupt && !validate_or_explain_query) {
315  // if we reach here, the current query which was waiting an idle executor
316  // within the dispatch queue is now scheduled to the specific executor
317  // (not UNITARY_EXECUTOR)
318  // so we update the query session's status with the executor that takes this query
319  std::tie(query_session, query_str) = executor_->attachExecutorToQuerySession(
320  query_session, query_str, query_submitted_time);
321 
322  // For now we can run a single query at a time, so if other query is already
323  // executed by different executor (i.e., when we turn on parallel executor),
324  // we can check the possibility of running this query by using the spinlock.
325  // If it fails to acquire a lock, it sleeps {g_pending_query_interrupt_freq} ms.
326  // And then try to get the lock (and again and again...)
327  if (!query_session.empty()) {
328  while (executor_->execute_spin_lock_.test_and_set(std::memory_order_acquire)) {
329  try {
330  executor_->checkPendingQueryStatus(query_session);
331  } catch (QueryExecutionError& e) {
333  throw std::runtime_error(
334  "Query execution has been interrupted (pending query)");
335  }
336  throw e;
337  } catch (...) {
338  throw std::runtime_error("Checking pending query status failed: unknown error");
339  }
340  // here it fails to acquire the lock, so sleep...
341  std::this_thread::sleep_for(
342  std::chrono::milliseconds(g_pending_query_interrupt_freq));
343  }
344  acquire_spin_lock = true;
345  // now the query is going to be executed, so update the status as "RUNNING"
346  executor_->updateQuerySessionStatus(query_state_,
347  QuerySessionStatus::QueryStatus::RUNNING);
348  }
349  }
350  auto acquire_execute_mutex = [](Executor * executor) -> auto {
351  auto ret = executor->acquireExecuteMutex();
352  return ret;
353  };
354  // now we get the spinlock that means this query is ready to run by the executor
355  // so we acquire executor lock in here to make sure that this executor holds
356  // all necessary resources and at the same time protect them against other executor
357  auto lock = acquire_execute_mutex(executor_);
358 
359  if (!render_info && !query_session.empty() && eo.allow_runtime_query_interrupt &&
360  !validate_or_explain_query) {
361  // check whether this query session is "already" interrupted
362  // this case occurs when there is very short gap between being interrupted and
363  // taking the execute lock
364  // for instance, the session can fire multiple queries that other queries are waiting
365  // in the spinlock but the user can request the query interruption
366  // if so we have to remove "all" queries initiated by this session and we do in here
367  // without running the query
368  try {
369  executor_->checkPendingQueryStatus(query_session);
370  } catch (QueryExecutionError& e) {
372  throw std::runtime_error("Query execution has been interrupted (pending query)");
373  }
374  throw e;
375  } catch (...) {
376  throw std::runtime_error("Checking pending query status failed: unknown error");
377  }
378  }
379 
380  // Notify foreign tables to load prior to caching
382 
383  int64_t queue_time_ms = timer_stop(clock_begin);
384  ScopeGuard row_set_holder = [this] { cleanupPostExecution(); };
385  const auto phys_inputs = get_physical_inputs(cat_, &ra);
386  const auto phys_table_ids = get_physical_table_inputs(&ra);
387  executor_->setCatalog(&cat_);
388  executor_->setupCaching(phys_inputs, phys_table_ids);
389 
390  ScopeGuard restore_metainfo_cache = [this] { executor_->clearMetaInfoCache(); };
391  auto ed_seq = RaExecutionSequence(&ra);
392 
393  if (just_explain_plan) {
394  std::stringstream ss;
395  std::vector<const RelAlgNode*> nodes;
396  for (size_t i = 0; i < ed_seq.size(); i++) {
397  nodes.emplace_back(ed_seq.getDescriptor(i)->getBody());
398  }
399  size_t ctr = nodes.size();
400  size_t tab_ctr = 0;
401  for (auto& body : boost::adaptors::reverse(nodes)) {
402  const auto index = ctr--;
403  const auto tabs = std::string(tab_ctr++, '\t');
404  CHECK(body);
405  ss << tabs << std::to_string(index) << " : " << body->toString() << "\n";
406  if (auto sort = dynamic_cast<const RelSort*>(body)) {
407  ss << tabs << " : " << sort->getInput(0)->toString() << "\n";
408  }
409  if (dynamic_cast<const RelProject*>(body) ||
410  dynamic_cast<const RelCompound*>(body)) {
411  if (auto join = dynamic_cast<const RelLeftDeepInnerJoin*>(body->getInput(0))) {
412  ss << tabs << " : " << join->toString() << "\n";
413  }
414  }
415  }
416  const auto& subqueries = getSubqueries();
417  if (!subqueries.empty()) {
418  ss << "Subqueries: "
419  << "\n";
420  for (const auto& subquery : subqueries) {
421  const auto ra = subquery->getRelAlg();
422  ss << "\t" << ra->toString() << "\n";
423  }
424  }
425  auto rs = std::make_shared<ResultSet>(ss.str());
426  return {rs, {}};
427  }
428 
429  if (render_info) {
430  // set render to be non-insitu in certain situations.
431  if (!render_info->disallow_in_situ_only_if_final_ED_is_aggregate &&
432  ed_seq.size() > 1) {
433  // old logic
434  // disallow if more than one ED
435  render_info->setInSituDataIfUnset(false);
436  }
437  }
438 
439  if (eo.find_push_down_candidates) {
440  // this extra logic is mainly due to current limitations on multi-step queries
441  // and/or subqueries.
443  ed_seq, co, eo, render_info, queue_time_ms);
444  }
445  timer_setup.stop();
446 
447  // Dispatch the subqueries first
448  for (auto subquery : getSubqueries()) {
449  const auto subquery_ra = subquery->getRelAlg();
450  CHECK(subquery_ra);
451  if (subquery_ra->hasContextData()) {
452  continue;
453  }
454  // Execute the subquery and cache the result.
455  RelAlgExecutor ra_executor(executor_, cat_, query_state_);
456  RaExecutionSequence subquery_seq(subquery_ra);
457  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
458  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
459  }
460  return executeRelAlgSeq(ed_seq, co, eo, render_info, queue_time_ms);
461 }
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1121
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:118
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
std::string join(T const &container, std::string const &delim)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
std::string to_string(char const *&&v)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
A container for relational algebra descriptors defining the execution order for a relational algebra ...
const Catalog_Namespace::Catalog & cat_
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
std::shared_ptr< const query_state::QueryState > query_state_
#define CHECK(condition)
Definition: Logger.h:206
#define DEBUG_TIMER(name)
Definition: Logger.h:322
std::unique_ptr< RelAlgDagBuilder > query_dag_
void cleanupPostExecution()
std::unordered_set< PhysicalInput > get_physical_inputs(const RelAlgNode *ra)
DEVICE void reverse(ARGS &&...args)
Definition: gpu_enabled.h:96
Executor * executor_
std::unordered_set< int > get_physical_table_inputs(const RelAlgNode *ra)
Type timer_start()
Definition: measure.h:42

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

QueryStepExecutionResult RelAlgExecutor::executeRelAlgQuerySingleStep ( const RaExecutionSequence seq,
const size_t  step_idx,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info 
)

Definition at line 500 of file RelAlgExecutor.cpp.

References ExecutionOptions::allow_loop_joins, ExecutionOptions::allow_multifrag, ExecutionOptions::allow_runtime_query_interrupt, CHECK, CHECK_EQ, anonymous_namespace{RelAlgExecutor.cpp}::check_sort_node_source_constraint(), createSortInputWorkUnit(), ExecutionOptions::dynamic_watchdog_time_limit, executeRelAlgSubSeq(), executor_, ExecutionOptions::executor_type, ExecutionOptions::find_push_down_candidates, RaExecutionSequence::getDescriptor(), ExecutionOptions::gpu_input_mem_limit_percent, INJECT_TIMER, ExecutionOptions::jit_debug, ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, anonymous_namespace{RelAlgExecutor.cpp}::node_is_aggregate(), ExecutionOptions::output_columnar_hint, ExecutionOptions::pending_query_interrupt_freq, post_execution_callback_, queue_time_ms_, Reduce, run_benchmark_import::result, ExecutionOptions::running_query_interrupt_freq, GroupByAndAggregate::shard_count_for_top_groups(), gpu_enabled::sort(), Union, VLOG, ExecutionOptions::with_dynamic_watchdog, and ExecutionOptions::with_watchdog.

505  {
506  INJECT_TIMER(executeRelAlgQueryStep);
507 
508  auto exe_desc_ptr = seq.getDescriptor(step_idx);
509  CHECK(exe_desc_ptr);
510  const auto sort = dynamic_cast<const RelSort*>(exe_desc_ptr->getBody());
511 
512  size_t shard_count{0};
513  auto merge_type = [&shard_count](const RelAlgNode* body) -> MergeType {
514  return node_is_aggregate(body) && !shard_count ? MergeType::Reduce : MergeType::Union;
515  };
516 
517  if (sort) {
519  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
521  source_work_unit.exe_unit, *executor_->getCatalog());
522  if (!shard_count) {
523  // No point in sorting on the leaf, only execute the input to the sort node.
524  CHECK_EQ(size_t(1), sort->inputCount());
525  const auto source = sort->getInput(0);
526  if (sort->collationCount() || node_is_aggregate(source)) {
527  auto temp_seq = RaExecutionSequence(std::make_unique<RaExecutionDesc>(source));
528  CHECK_EQ(temp_seq.size(), size_t(1));
529  ExecutionOptions eo_copy = {
531  eo.allow_multifrag,
532  eo.just_explain,
533  eo.allow_loop_joins,
534  eo.with_watchdog,
535  eo.jit_debug,
536  eo.just_validate || sort->isEmptyResult(),
545  eo.executor_type,
546  };
547  // Use subseq to avoid clearing existing temporary tables
548  return {
549  executeRelAlgSubSeq(temp_seq, std::make_pair(0, 1), co, eo_copy, nullptr, 0),
550  merge_type(source),
551  source->getId(),
552  false};
553  }
554  }
555  }
558  std::make_pair(step_idx, step_idx + 1),
559  co,
560  eo,
561  render_info,
563  merge_type(exe_desc_ptr->getBody()),
564  exe_desc_ptr->getBody()->getId(),
565  false};
567  VLOG(1) << "Running post execution callback.";
568  (*post_execution_callback_)();
569  }
570  return result;
571 }
int64_t queue_time_ms_
#define CHECK_EQ(x, y)
Definition: Logger.h:214
std::optional< std::function< void()> > post_execution_callback_
RaExecutionDesc * getDescriptor(size_t idx) const
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
void check_sort_node_source_constraint(const RelSort *sort)
double running_query_interrupt_freq
ExecutorType executor_type
#define INJECT_TIMER(DESC)
Definition: measure.h:93
MergeType
A container for relational algebra descriptors defining the execution order for a relational algebra ...
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
unsigned pending_query_interrupt_freq
bool node_is_aggregate(const RelAlgNode *ra)
#define CHECK(condition)
Definition: Logger.h:206
double gpu_input_mem_limit_percent
unsigned dynamic_watchdog_time_limit
Executor * executor_
#define VLOG(n)
Definition: Logger.h:300
WorkUnit createSortInputWorkUnit(const RelSort *, const ExecutionOptions &eo)
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)

+ Here is the call graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgQueryWithFilterPushDown ( const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)

Definition at line 145 of file JoinFilterPushDown.cpp.

References ExecutionOptions::allow_loop_joins, ExecutionOptions::allow_multifrag, ExecutionOptions::allow_runtime_query_interrupt, cat_, CHECK, ExecutionOptions::dynamic_watchdog_time_limit, executeRelAlgSeq(), executor_, ExecutionOptions::find_push_down_candidates, getSubqueries(), ExecutionOptions::gpu_input_mem_limit_percent, ExecutionOptions::jit_debug, ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, ExecutionOptions::output_columnar_hint, ExecutionOptions::pending_query_interrupt_freq, run_benchmark_import::result, ExecutionOptions::running_query_interrupt_freq, RaExecutionSequence::size(), ExecutionOptions::with_dynamic_watchdog, and ExecutionOptions::with_watchdog.

Referenced by executeRelAlgQueryNoRetry().

150  {
151  // we currently do not fully support filter push down with
152  // multi-step execution and/or with subqueries
153  // TODO(Saman): add proper caching to enable filter push down for all cases
154  const auto& subqueries = getSubqueries();
155  if (seq.size() > 1 || !subqueries.empty()) {
156  if (eo.just_calcite_explain) {
157  return ExecutionResult(std::vector<PushedDownFilterInfo>{},
159  }
160  const ExecutionOptions eo_modified{eo.output_columnar_hint,
161  eo.allow_multifrag,
162  eo.just_explain,
163  eo.allow_loop_joins,
164  eo.with_watchdog,
165  eo.jit_debug,
166  eo.just_validate,
169  /*find_push_down_candidates=*/false,
170  /*just_calcite_explain=*/false,
175 
176  // Dispatch the subqueries first
177  for (auto subquery : subqueries) {
178  // Execute the subquery and cache the result.
179  RelAlgExecutor ra_executor(executor_, cat_);
180  const auto subquery_ra = subquery->getRelAlg();
181  CHECK(subquery_ra);
182  RaExecutionSequence subquery_seq(subquery_ra);
183  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
184  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
185  }
186  return executeRelAlgSeq(seq, co, eo_modified, render_info, queue_time_ms);
187  }
188  // else
189 
190  // Dispatch the subqueries first
191  for (auto subquery : subqueries) {
192  // Execute the subquery and cache the result.
193  RelAlgExecutor ra_executor(executor_, cat_);
194  const auto subquery_ra = subquery->getRelAlg();
195  CHECK(subquery_ra);
196  RaExecutionSequence subquery_seq(subquery_ra);
197  auto result = ra_executor.executeRelAlgSeq(subquery_seq, co, eo, nullptr, 0);
198  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
199  }
200  return executeRelAlgSeq(seq, co, eo, render_info, queue_time_ms);
201 }
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
double running_query_interrupt_freq
A container for relational algebra descriptors defining the execution order for a relational algebra ...
const Catalog_Namespace::Catalog & cat_
unsigned pending_query_interrupt_freq
#define CHECK(condition)
Definition: Logger.h:206
double gpu_input_mem_limit_percent
unsigned dynamic_watchdog_time_limit
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgSeq ( const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const bool  with_existing_temp_tables = false 
)

Definition at line 590 of file RelAlgExecutor.cpp.

References cat_, CHECK, CHECK_GE, DEBUG_TIMER, RaExecutionSequence::empty(), executeRelAlgStep(), executor_, ExecutionOptions::executor_type, g_enable_interop, RaExecutionDesc::getBody(), RaExecutionSequence::getDescriptor(), i, logger::INFO, INJECT_TIMER, ExecutionOptions::just_explain, LOG, now_, RaExecutionSequence::size(), gpu_enabled::swap(), target_exprs_owned_, temporary_tables_, and VLOG.

Referenced by executeRelAlgQueryNoRetry(), and executeRelAlgQueryWithFilterPushDown().

595  {
597  auto timer = DEBUG_TIMER(__func__);
598  if (!with_existing_temp_tables) {
600  }
602  executor_->catalog_ = &cat_;
603  executor_->temporary_tables_ = &temporary_tables_;
604 
605  time(&now_);
606  CHECK(!seq.empty());
607 
608  auto get_descriptor_count = [&seq, &eo]() -> size_t {
609  if (eo.just_explain) {
610  if (dynamic_cast<const RelLogicalValues*>(seq.getDescriptor(0)->getBody())) {
611  // run the logical values descriptor to generate the result set, then the next
612  // descriptor to generate the explain
613  CHECK_GE(seq.size(), size_t(2));
614  return 2;
615  } else {
616  return 1;
617  }
618  } else {
619  return seq.size();
620  }
621  };
622 
623  const auto exec_desc_count = get_descriptor_count();
624 
625  for (size_t i = 0; i < exec_desc_count; i++) {
626  VLOG(1) << "Executing query step " << i;
627  // only render on the last step
628  try {
629  executeRelAlgStep(seq,
630  i,
631  co,
632  eo,
633  (i == exec_desc_count - 1) ? render_info : nullptr,
634  queue_time_ms);
635  } catch (const NativeExecutionError&) {
636  if (!g_enable_interop) {
637  throw;
638  }
639  auto eo_extern = eo;
640  eo_extern.executor_type = ::ExecutorType::Extern;
641  auto exec_desc_ptr = seq.getDescriptor(i);
642  const auto body = exec_desc_ptr->getBody();
643  const auto compound = dynamic_cast<const RelCompound*>(body);
644  if (compound && (compound->getGroupByCount() || compound->isAggregate())) {
645  LOG(INFO) << "Also failed to run the query using interoperability";
646  throw;
647  }
648  executeRelAlgStep(seq,
649  i,
650  co,
651  eo_extern,
652  (i == exec_desc_count - 1) ? render_info : nullptr,
653  queue_time_ms);
654  }
655  }
656 
657  return seq.getDescriptor(exec_desc_count - 1)->getResult();
658 }
RaExecutionDesc * getDescriptor(size_t idx) const
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
#define LOG(tag)
Definition: Logger.h:200
TemporaryTables temporary_tables_
#define CHECK_GE(x, y)
Definition: Logger.h:219
bool g_enable_interop
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
ExecutorType executor_type
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const Catalog_Namespace::Catalog & cat_
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
#define CHECK(condition)
Definition: Logger.h:206
#define DEBUG_TIMER(name)
Definition: Logger.h:322
const RelAlgNode * getBody() const
Executor * executor_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define VLOG(n)
Definition: Logger.h:300

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeRelAlgStep ( const RaExecutionSequence seq,
const size_t  step_idx,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 685 of file RelAlgExecutor.cpp.

References addTemporaryTable(), ExecutionOptions::allow_loop_joins, ExecutionOptions::allow_multifrag, ExecutionOptions::allow_runtime_query_interrupt, cat_, CHECK, DEBUG_TIMER, ExecutionOptions::dynamic_watchdog_time_limit, executeAggregate(), executeCompound(), executeDelete(), executeFilter(), executeLogicalValues(), executeModify(), executeProject(), executeSort(), executeTableFunction(), executeUnion(), executeUpdate(), executor_, ExecutionOptions::executor_type, logger::FATAL, ExecutionOptions::find_push_down_candidates, g_cluster, g_enable_union, g_skip_intermediate_count, RaExecutionSequence::getDescriptor(), RelAlgNode::getId(), ExecutionOptions::gpu_input_mem_limit_percent, handleNop(), INJECT_TIMER, ExecutionOptions::jit_debug, ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, LOG, ExecutionOptions::outer_fragment_indices, ExecutionOptions::output_columnar_hint, ExecutionOptions::pending_query_interrupt_freq, anonymous_namespace{RelAlgExecutor.cpp}::prepare_foreign_table_for_execution(), WindowProjectNodeContext::reset(), ExecutionOptions::running_query_interrupt_freq, gpu_enabled::sort(), VLOG, ExecutionOptions::with_dynamic_watchdog, and ExecutionOptions::with_watchdog.

Referenced by executeRelAlgSeq(), and executeRelAlgSubSeq().

690  {
692  auto timer = DEBUG_TIMER(__func__);
694  auto exec_desc_ptr = seq.getDescriptor(step_idx);
695  CHECK(exec_desc_ptr);
696  auto& exec_desc = *exec_desc_ptr;
697  const auto body = exec_desc.getBody();
698  if (body->isNop()) {
699  handleNop(exec_desc);
700  return;
701  }
702  const ExecutionOptions eo_work_unit{
704  eo.allow_multifrag,
705  eo.just_explain,
706  eo.allow_loop_joins,
707  eo.with_watchdog && (step_idx == 0 || dynamic_cast<const RelProject*>(body)),
708  eo.jit_debug,
709  eo.just_validate,
718  eo.executor_type,
719  step_idx == 0 ? eo.outer_fragment_indices : std::vector<size_t>()};
720 
721  // Notify foreign tables to load prior to execution
723 
724  const auto compound = dynamic_cast<const RelCompound*>(body);
725  if (compound) {
726  if (compound->isDeleteViaSelect()) {
727  executeDelete(compound, co, eo_work_unit, queue_time_ms);
728  } else if (compound->isUpdateViaSelect()) {
729  executeUpdate(compound, co, eo_work_unit, queue_time_ms);
730  } else {
731  exec_desc.setResult(
732  executeCompound(compound, co, eo_work_unit, render_info, queue_time_ms));
733  VLOG(3) << "Returned from executeCompound(), addTemporaryTable("
734  << static_cast<int>(-compound->getId()) << ", ...)"
735  << " exec_desc.getResult().getDataPtr()->rowCount()="
736  << exec_desc.getResult().getDataPtr()->rowCount();
737  if (exec_desc.getResult().isFilterPushDownEnabled()) {
738  return;
739  }
740  addTemporaryTable(-compound->getId(), exec_desc.getResult().getDataPtr());
741  }
742  return;
743  }
744  const auto project = dynamic_cast<const RelProject*>(body);
745  if (project) {
746  if (project->isDeleteViaSelect()) {
747  executeDelete(project, co, eo_work_unit, queue_time_ms);
748  } else if (project->isUpdateViaSelect()) {
749  executeUpdate(project, co, eo_work_unit, queue_time_ms);
750  } else {
751  std::optional<size_t> prev_count;
752  // Disabling the intermediate count optimization in distributed, as the previous
753  // execution descriptor will likely not hold the aggregated result.
754  if (g_skip_intermediate_count && step_idx > 0 && !g_cluster) {
755  auto prev_exec_desc = seq.getDescriptor(step_idx - 1);
756  CHECK(prev_exec_desc);
757  RelAlgNode const* prev_body = prev_exec_desc->getBody();
758  // This optimization needs to be restricted in its application for UNION, which
759  // can have 2 input nodes in which neither should restrict the count of the other.
760  // However some non-UNION queries are measurably slower with this restriction, so
761  // it is only applied when g_enable_union is true.
762  bool const parent_check =
763  !g_enable_union || project->getInput(0)->getId() == prev_body->getId();
764  // If the previous node produced a reliable count, skip the pre-flight count
765  if (parent_check && (dynamic_cast<const RelCompound*>(prev_body) ||
766  dynamic_cast<const RelLogicalValues*>(prev_body))) {
767  const auto& prev_exe_result = prev_exec_desc->getResult();
768  const auto prev_result = prev_exe_result.getRows();
769  if (prev_result) {
770  prev_count = prev_result->rowCount();
771  VLOG(3) << "Setting output row count for projection node to previous node ("
772  << prev_exec_desc->getBody()->toString() << ") to " << *prev_count;
773  }
774  }
775  }
776  exec_desc.setResult(executeProject(
777  project, co, eo_work_unit, render_info, queue_time_ms, prev_count));
778  VLOG(3) << "Returned from executeProject(), addTemporaryTable("
779  << static_cast<int>(-project->getId()) << ", ...)"
780  << " exec_desc.getResult().getDataPtr()->rowCount()="
781  << exec_desc.getResult().getDataPtr()->rowCount();
782  if (exec_desc.getResult().isFilterPushDownEnabled()) {
783  return;
784  }
785  addTemporaryTable(-project->getId(), exec_desc.getResult().getDataPtr());
786  }
787  return;
788  }
789  const auto aggregate = dynamic_cast<const RelAggregate*>(body);
790  if (aggregate) {
791  exec_desc.setResult(
792  executeAggregate(aggregate, co, eo_work_unit, render_info, queue_time_ms));
793  addTemporaryTable(-aggregate->getId(), exec_desc.getResult().getDataPtr());
794  return;
795  }
796  const auto filter = dynamic_cast<const RelFilter*>(body);
797  if (filter) {
798  exec_desc.setResult(
799  executeFilter(filter, co, eo_work_unit, render_info, queue_time_ms));
800  addTemporaryTable(-filter->getId(), exec_desc.getResult().getDataPtr());
801  return;
802  }
803  const auto sort = dynamic_cast<const RelSort*>(body);
804  if (sort) {
805  exec_desc.setResult(executeSort(sort, co, eo_work_unit, render_info, queue_time_ms));
806  if (exec_desc.getResult().isFilterPushDownEnabled()) {
807  return;
808  }
809  addTemporaryTable(-sort->getId(), exec_desc.getResult().getDataPtr());
810  return;
811  }
812  const auto logical_values = dynamic_cast<const RelLogicalValues*>(body);
813  if (logical_values) {
814  exec_desc.setResult(executeLogicalValues(logical_values, eo_work_unit));
815  addTemporaryTable(-logical_values->getId(), exec_desc.getResult().getDataPtr());
816  return;
817  }
818  const auto modify = dynamic_cast<const RelModify*>(body);
819  if (modify) {
820  exec_desc.setResult(executeModify(modify, eo_work_unit));
821  return;
822  }
823  const auto logical_union = dynamic_cast<const RelLogicalUnion*>(body);
824  if (logical_union) {
825  exec_desc.setResult(
826  executeUnion(logical_union, seq, co, eo_work_unit, render_info, queue_time_ms));
827  addTemporaryTable(-logical_union->getId(), exec_desc.getResult().getDataPtr());
828  return;
829  }
830  const auto table_func = dynamic_cast<const RelTableFunction*>(body);
831  if (table_func) {
832  exec_desc.setResult(
833  executeTableFunction(table_func, co, eo_work_unit, queue_time_ms));
834  addTemporaryTable(-table_func->getId(), exec_desc.getResult().getDataPtr());
835  return;
836  }
837  LOG(FATAL) << "Unhandled body type: " << body->toString();
838 }
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
RaExecutionDesc * getDescriptor(size_t idx) const
void prepare_foreign_table_for_execution(const RelAlgNode &ra_node, const Catalog_Namespace::Catalog &catalog)
bool g_skip_intermediate_count
#define LOG(tag)
Definition: Logger.h:200
std::vector< size_t > outer_fragment_indices
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
double running_query_interrupt_freq
void handleNop(RaExecutionDesc &ed)
unsigned getId() const
ExecutorType executor_type
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static void reset(Executor *executor)
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
const Catalog_Namespace::Catalog & cat_
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
unsigned pending_query_interrupt_freq
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
ExecutionResult executeUnion(const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
#define CHECK(condition)
Definition: Logger.h:206
#define DEBUG_TIMER(name)
Definition: Logger.h:322
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
double gpu_input_mem_limit_percent
bool g_enable_union
bool g_cluster
unsigned dynamic_watchdog_time_limit
void executeDelete(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
Executor * executor_
#define VLOG(n)
Definition: Logger.h:300

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeRelAlgSubSeq ( const RaExecutionSequence seq,
const std::pair< size_t, size_t >  interval,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)

Definition at line 660 of file RelAlgExecutor.cpp.

References cat_, executeRelAlgStep(), executor_, RaExecutionSequence::getDescriptor(), i, INJECT_TIMER, now_, and temporary_tables_.

Referenced by executeRelAlgQuerySingleStep().

666  {
668  executor_->catalog_ = &cat_;
669  executor_->temporary_tables_ = &temporary_tables_;
670 
671  time(&now_);
672  for (size_t i = interval.first; i < interval.second; i++) {
673  // only render on the last step
674  executeRelAlgStep(seq,
675  i,
676  co,
677  eo,
678  (i == interval.second - 1) ? render_info : nullptr,
679  queue_time_ms);
680  }
681 
682  return seq.getDescriptor(interval.second - 1)->getResult();
683 }
RaExecutionDesc * getDescriptor(size_t idx) const
TemporaryTables temporary_tables_
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const Catalog_Namespace::Catalog & cat_
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeSimpleInsert ( const Analyzer::Query insert_query)

Definition at line 2231 of file RelAlgExecutor.cpp.

References appendDatum(), DataBlockPtr::arraysPtr, cat_, CHECK, CHECK_EQ, checked_malloc(), Fragmenter_Namespace::InsertData::columnIds, CPU, Fragmenter_Namespace::InsertData::data, Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, Data_Namespace::DISK_LEVEL, executor_, import_export::fill_missing_columns(), g_cluster, get_column_descriptor(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_elem_type(), Analyzer::Query::get_result_col_list(), Analyzer::Query::get_result_table_id(), anonymous_namespace{RelAlgExecutor.cpp}::get_shard_for_key(), SQLTypeInfo::get_size(), Analyzer::Query::get_targetlist(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getMetadataForDict(), Catalog_Namespace::Catalog::getMetadataForTable(), inline_fixed_encoding_null_val(), anonymous_namespace{RelAlgExecutor.cpp}::insert_one_dict_str(), is_null(), SQLTypeInfo::is_string(), kARRAY, kBIGINT, kBOOLEAN, kCAST, kCHAR, kDATE, kDECIMAL, kDOUBLE, kENCODING_DICT, kENCODING_NONE, kFLOAT, kINT, kLINESTRING, kMULTIPOLYGON, kNUMERIC, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, DataBlockPtr::numbersPtr, Fragmenter_Namespace::InsertData::numRows, anonymous_namespace{TypedDataAccessors.h}::put_null(), anonymous_namespace{TypedDataAccessors.h}::put_null_array(), DataBlockPtr::stringsPtr, Fragmenter_Namespace::InsertData::tableId, and to_string().

Referenced by Parser::InsertValuesStmt::execute().

2231  {
2232  // Note: We currently obtain an executor for this method, but we do not need it.
2233  // Therefore, we skip the executor state setup in the regular execution path. In the
2234  // future, we will likely want to use the executor to evaluate expressions in the insert
2235  // statement.
2236 
2237  const auto& targets = query.get_targetlist();
2238  const int table_id = query.get_result_table_id();
2239  const auto& col_id_list = query.get_result_col_list();
2240 
2241  std::vector<const ColumnDescriptor*> col_descriptors;
2242  std::vector<int> col_ids;
2243  std::unordered_map<int, std::unique_ptr<uint8_t[]>> col_buffers;
2244  std::unordered_map<int, std::vector<std::string>> str_col_buffers;
2245  std::unordered_map<int, std::vector<ArrayDatum>> arr_col_buffers;
2246 
2247  for (const int col_id : col_id_list) {
2248  const auto cd = get_column_descriptor(col_id, table_id, cat_);
2249  const auto col_enc = cd->columnType.get_compression();
2250  if (cd->columnType.is_string()) {
2251  switch (col_enc) {
2252  case kENCODING_NONE: {
2253  auto it_ok =
2254  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2255  CHECK(it_ok.second);
2256  break;
2257  }
2258  case kENCODING_DICT: {
2259  const auto dd = cat_.getMetadataForDict(cd->columnType.get_comp_param());
2260  CHECK(dd);
2261  const auto it_ok = col_buffers.emplace(
2262  col_id, std::make_unique<uint8_t[]>(cd->columnType.get_size()));
2263  CHECK(it_ok.second);
2264  break;
2265  }
2266  default:
2267  CHECK(false);
2268  }
2269  } else if (cd->columnType.is_geometry()) {
2270  auto it_ok =
2271  str_col_buffers.insert(std::make_pair(col_id, std::vector<std::string>{}));
2272  CHECK(it_ok.second);
2273  } else if (cd->columnType.is_array()) {
2274  auto it_ok =
2275  arr_col_buffers.insert(std::make_pair(col_id, std::vector<ArrayDatum>{}));
2276  CHECK(it_ok.second);
2277  } else {
2278  const auto it_ok = col_buffers.emplace(
2279  col_id,
2280  std::unique_ptr<uint8_t[]>(
2281  new uint8_t[cd->columnType.get_logical_size()]())); // changed to zero-init
2282  // the buffer
2283  CHECK(it_ok.second);
2284  }
2285  col_descriptors.push_back(cd);
2286  col_ids.push_back(col_id);
2287  }
2288  size_t col_idx = 0;
2290  insert_data.databaseId = cat_.getCurrentDB().dbId;
2291  insert_data.tableId = table_id;
2292  for (auto target_entry : targets) {
2293  auto col_cv = dynamic_cast<const Analyzer::Constant*>(target_entry->get_expr());
2294  if (!col_cv) {
2295  auto col_cast = dynamic_cast<const Analyzer::UOper*>(target_entry->get_expr());
2296  CHECK(col_cast);
2297  CHECK_EQ(kCAST, col_cast->get_optype());
2298  col_cv = dynamic_cast<const Analyzer::Constant*>(col_cast->get_operand());
2299  }
2300  CHECK(col_cv);
2301  const auto cd = col_descriptors[col_idx];
2302  auto col_datum = col_cv->get_constval();
2303  auto col_type = cd->columnType.get_type();
2304  uint8_t* col_data_bytes{nullptr};
2305  if (!cd->columnType.is_array() && !cd->columnType.is_geometry() &&
2306  (!cd->columnType.is_string() ||
2307  cd->columnType.get_compression() == kENCODING_DICT)) {
2308  const auto col_data_bytes_it = col_buffers.find(col_ids[col_idx]);
2309  CHECK(col_data_bytes_it != col_buffers.end());
2310  col_data_bytes = col_data_bytes_it->second.get();
2311  }
2312  switch (col_type) {
2313  case kBOOLEAN: {
2314  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2315  auto null_bool_val =
2316  col_datum.boolval == inline_fixed_encoding_null_val(cd->columnType);
2317  *col_data = col_cv->get_is_null() || null_bool_val
2318  ? inline_fixed_encoding_null_val(cd->columnType)
2319  : (col_datum.boolval ? 1 : 0);
2320  break;
2321  }
2322  case kTINYINT: {
2323  auto col_data = reinterpret_cast<int8_t*>(col_data_bytes);
2324  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2325  : col_datum.tinyintval;
2326  break;
2327  }
2328  case kSMALLINT: {
2329  auto col_data = reinterpret_cast<int16_t*>(col_data_bytes);
2330  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2331  : col_datum.smallintval;
2332  break;
2333  }
2334  case kINT: {
2335  auto col_data = reinterpret_cast<int32_t*>(col_data_bytes);
2336  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2337  : col_datum.intval;
2338  break;
2339  }
2340  case kBIGINT:
2341  case kDECIMAL:
2342  case kNUMERIC: {
2343  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2344  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2345  : col_datum.bigintval;
2346  break;
2347  }
2348  case kFLOAT: {
2349  auto col_data = reinterpret_cast<float*>(col_data_bytes);
2350  *col_data = col_datum.floatval;
2351  break;
2352  }
2353  case kDOUBLE: {
2354  auto col_data = reinterpret_cast<double*>(col_data_bytes);
2355  *col_data = col_datum.doubleval;
2356  break;
2357  }
2358  case kTEXT:
2359  case kVARCHAR:
2360  case kCHAR: {
2361  switch (cd->columnType.get_compression()) {
2362  case kENCODING_NONE:
2363  str_col_buffers[col_ids[col_idx]].push_back(
2364  col_datum.stringval ? *col_datum.stringval : "");
2365  break;
2366  case kENCODING_DICT: {
2367  switch (cd->columnType.get_size()) {
2368  case 1:
2370  reinterpret_cast<uint8_t*>(col_data_bytes), cd, col_cv, cat_);
2371  break;
2372  case 2:
2374  reinterpret_cast<uint16_t*>(col_data_bytes), cd, col_cv, cat_);
2375  break;
2376  case 4:
2378  reinterpret_cast<int32_t*>(col_data_bytes), cd, col_cv, cat_);
2379  break;
2380  default:
2381  CHECK(false);
2382  }
2383  break;
2384  }
2385  default:
2386  CHECK(false);
2387  }
2388  break;
2389  }
2390  case kTIME:
2391  case kTIMESTAMP:
2392  case kDATE: {
2393  auto col_data = reinterpret_cast<int64_t*>(col_data_bytes);
2394  *col_data = col_cv->get_is_null() ? inline_fixed_encoding_null_val(cd->columnType)
2395  : col_datum.bigintval;
2396  break;
2397  }
2398  case kARRAY: {
2399  const auto is_null = col_cv->get_is_null();
2400  const auto size = cd->columnType.get_size();
2401  const SQLTypeInfo elem_ti = cd->columnType.get_elem_type();
2402  // POINT coords: [un]compressed coords always need to be encoded, even if NULL
2403  const auto is_point_coords = (cd->isGeoPhyCol && elem_ti.get_type() == kTINYINT);
2404  if (is_null && !is_point_coords) {
2405  if (size > 0) {
2406  // NULL fixlen array: NULL_ARRAY sentinel followed by NULL sentinels
2407  if (elem_ti.is_string() && elem_ti.get_compression() == kENCODING_DICT) {
2408  throw std::runtime_error("Column " + cd->columnName +
2409  " doesn't accept NULL values");
2410  }
2411  int8_t* buf = (int8_t*)checked_malloc(size);
2412  put_null_array(static_cast<void*>(buf), elem_ti, "");
2413  for (int8_t* p = buf + elem_ti.get_size(); (p - buf) < size;
2414  p += elem_ti.get_size()) {
2415  put_null(static_cast<void*>(p), elem_ti, "");
2416  }
2417  arr_col_buffers[col_ids[col_idx]].emplace_back(size, buf, is_null);
2418  } else {
2419  arr_col_buffers[col_ids[col_idx]].emplace_back(0, nullptr, is_null);
2420  }
2421  break;
2422  }
2423  const auto l = col_cv->get_value_list();
2424  size_t len = l.size() * elem_ti.get_size();
2425  if (size > 0 && static_cast<size_t>(size) != len) {
2426  throw std::runtime_error("Array column " + cd->columnName + " expects " +
2427  std::to_string(size / elem_ti.get_size()) +
2428  " values, " + "received " + std::to_string(l.size()));
2429  }
2430  if (elem_ti.is_string()) {
2431  CHECK(kENCODING_DICT == elem_ti.get_compression());
2432  CHECK(4 == elem_ti.get_size());
2433 
2434  int8_t* buf = (int8_t*)checked_malloc(len);
2435  int32_t* p = reinterpret_cast<int32_t*>(buf);
2436 
2437  int elemIndex = 0;
2438  for (auto& e : l) {
2439  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2440  CHECK(c);
2441  insert_one_dict_str(&p[elemIndex], cd->columnName, elem_ti, c.get(), cat_);
2442  elemIndex++;
2443  }
2444  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2445 
2446  } else {
2447  int8_t* buf = (int8_t*)checked_malloc(len);
2448  int8_t* p = buf;
2449  for (auto& e : l) {
2450  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2451  CHECK(c);
2452  p = appendDatum(p, c->get_constval(), elem_ti);
2453  }
2454  arr_col_buffers[col_ids[col_idx]].push_back(ArrayDatum(len, buf, is_null));
2455  }
2456  break;
2457  }
2458  case kPOINT:
2459  case kLINESTRING:
2460  case kPOLYGON:
2461  case kMULTIPOLYGON:
2462  str_col_buffers[col_ids[col_idx]].push_back(
2463  col_datum.stringval ? *col_datum.stringval : "");
2464  break;
2465  default:
2466  CHECK(false);
2467  }
2468  ++col_idx;
2469  }
2470  for (const auto& kv : col_buffers) {
2471  insert_data.columnIds.push_back(kv.first);
2472  DataBlockPtr p;
2473  p.numbersPtr = reinterpret_cast<int8_t*>(kv.second.get());
2474  insert_data.data.push_back(p);
2475  }
2476  for (auto& kv : str_col_buffers) {
2477  insert_data.columnIds.push_back(kv.first);
2478  DataBlockPtr p;
2479  p.stringsPtr = &kv.second;
2480  insert_data.data.push_back(p);
2481  }
2482  for (auto& kv : arr_col_buffers) {
2483  insert_data.columnIds.push_back(kv.first);
2484  DataBlockPtr p;
2485  p.arraysPtr = &kv.second;
2486  insert_data.data.push_back(p);
2487  }
2488  insert_data.numRows = 1;
2489  auto data_memory_holder = import_export::fill_missing_columns(&cat_, insert_data);
2490  const auto table_descriptor = cat_.getMetadataForTable(table_id);
2491  CHECK(table_descriptor);
2492  if (table_descriptor->nShards > 0) {
2493  auto shard = get_shard_for_key(table_descriptor, cat_, insert_data);
2494  CHECK(shard);
2495  shard->fragmenter->insertDataNoCheckpoint(insert_data);
2496  } else {
2497  table_descriptor->fragmenter->insertDataNoCheckpoint(insert_data);
2498  }
2499 
2500  // Ensure checkpoint happens across all shards, if not in distributed
2501  // mode (aggregator handles checkpointing in distributed mode)
2502  if (!g_cluster &&
2503  table_descriptor->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
2504  const_cast<Catalog_Namespace::Catalog&>(cat_).checkpointWithAutoRollback(table_id);
2505  }
2506 
2507  auto rs = std::make_shared<ResultSet>(TargetInfoList{},
2510  executor_->getRowSetMemoryOwner(),
2511  nullptr,
2512  0,
2513  0);
2514  std::vector<TargetMetaInfo> empty_targets;
2515  return {rs, empty_targets};
2516 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:102
Definition: sqltypes.h:48
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:221
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:222
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:5183
int64_t insert_one_dict_str(T *col_data, const std::string &columnName, const SQLTypeInfo &columnType, const Analyzer::Constant *col_cv, const Catalog_Namespace::Catalog &catalog)
Definition: sqldefs.h:49
std::vector< TargetInfo > TargetInfoList
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
std::string to_string(char const *&&v)
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:202
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
CONSTEXPR DEVICE bool is_null(const T &value)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:222
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
const TableDescriptor * get_shard_for_key(const TableDescriptor *td, const Catalog_Namespace::Catalog &cat, const Fragmenter_Namespace::InsertData &data)
void put_null_array(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
const Catalog_Namespace::Catalog & cat_
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1494
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
Definition: sqltypes.h:51
Definition: sqltypes.h:52
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: sqltypes.h:941
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:322
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
Definition: sqltypes.h:40
#define CHECK(condition)
Definition: Logger.h:206
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
bool g_cluster
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
Definition: sqltypes.h:44
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_string() const
Definition: sqltypes.h:489
int8_t * numbersPtr
Definition: sqltypes.h:220
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:713
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
Executor * executor_
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:192

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeSort ( const RelSort sort,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2548 of file RelAlgExecutor.cpp.

References SpeculativeTopNBlacklist::add(), CHECK, CHECK_EQ, anonymous_namespace{RelAlgExecutor.cpp}::check_sort_node_source_constraint(), RelSort::collationCount(), createSortInputWorkUnit(), DEBUG_TIMER, executeWorkUnit(), executor_, anonymous_namespace{RelAlgExecutor.cpp}::first_oe_is_desc(), g_cluster, anonymous_namespace{RelAlgExecutor.cpp}::get_order_entries(), RelAlgNode::getId(), RelAlgNode::getInput(), RelSort::getLimit(), RelSort::getOffset(), ExecutionResult::getRows(), RelSort::isEmptyResult(), leaf_results_, anonymous_namespace{RelAlgExecutor.cpp}::node_is_aggregate(), ExecutionOptions::output_columnar_hint, run_benchmark_import::result, RelAlgNode::setOutputMetainfo(), gpu_enabled::sort(), speculative_topn_blacklist_, and use_speculative_top_n().

Referenced by executeRelAlgStep().

2552  {
2553  auto timer = DEBUG_TIMER(__func__);
2555  const auto source = sort->getInput(0);
2556  const bool is_aggregate = node_is_aggregate(source);
2557  auto it = leaf_results_.find(sort->getId());
2558  if (it != leaf_results_.end()) {
2559  // Add any transient string literals to the sdp on the agg
2560  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2561  executor_->addTransientStringLiterals(source_work_unit.exe_unit,
2562  executor_->row_set_mem_owner_);
2563  // Handle push-down for LIMIT for multi-node
2564  auto& aggregated_result = it->second;
2565  auto& result_rows = aggregated_result.rs;
2566  const size_t limit = sort->getLimit();
2567  const size_t offset = sort->getOffset();
2568  const auto order_entries = get_order_entries(sort);
2569  if (limit || offset) {
2570  if (!order_entries.empty()) {
2571  result_rows->sort(order_entries, limit + offset, executor_);
2572  }
2573  result_rows->dropFirstN(offset);
2574  if (limit) {
2575  result_rows->keepFirstN(limit);
2576  }
2577  }
2578  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2579  sort->setOutputMetainfo(aggregated_result.targets_meta);
2580  return result;
2581  }
2582 
2583  std::list<std::shared_ptr<Analyzer::Expr>> groupby_exprs;
2584  bool is_desc{false};
2585 
2586  auto execute_sort_query = [this,
2587  sort,
2588  &source,
2589  &is_aggregate,
2590  &eo,
2591  &co,
2592  render_info,
2593  queue_time_ms,
2594  &groupby_exprs,
2595  &is_desc]() -> ExecutionResult {
2596  const auto source_work_unit = createSortInputWorkUnit(sort, eo);
2597  is_desc = first_oe_is_desc(source_work_unit.exe_unit.sort_info.order_entries);
2598  ExecutionOptions eo_copy = {
2600  eo.allow_multifrag,
2601  eo.just_explain,
2602  eo.allow_loop_joins,
2603  eo.with_watchdog,
2604  eo.jit_debug,
2605  eo.just_validate || sort->isEmptyResult(),
2606  eo.with_dynamic_watchdog,
2607  eo.dynamic_watchdog_time_limit,
2608  eo.find_push_down_candidates,
2609  eo.just_calcite_explain,
2610  eo.gpu_input_mem_limit_percent,
2611  eo.allow_runtime_query_interrupt,
2612  eo.running_query_interrupt_freq,
2613  eo.pending_query_interrupt_freq,
2614  eo.executor_type,
2615  };
2616 
2617  groupby_exprs = source_work_unit.exe_unit.groupby_exprs;
2618  auto source_result = executeWorkUnit(source_work_unit,
2619  source->getOutputMetainfo(),
2620  is_aggregate,
2621  co,
2622  eo_copy,
2623  render_info,
2624  queue_time_ms);
2625  if (render_info && render_info->isPotentialInSituRender()) {
2626  return source_result;
2627  }
2628  if (source_result.isFilterPushDownEnabled()) {
2629  return source_result;
2630  }
2631  auto rows_to_sort = source_result.getRows();
2632  if (eo.just_explain) {
2633  return {rows_to_sort, {}};
2634  }
2635  const size_t limit = sort->getLimit();
2636  const size_t offset = sort->getOffset();
2637  if (sort->collationCount() != 0 && !rows_to_sort->definitelyHasNoRows() &&
2638  !use_speculative_top_n(source_work_unit.exe_unit,
2639  rows_to_sort->getQueryMemDesc())) {
2640  const size_t top_n = limit == 0 ? 0 : limit + offset;
2641  rows_to_sort->sort(
2642  source_work_unit.exe_unit.sort_info.order_entries, top_n, executor_);
2643  }
2644  if (limit || offset) {
2645  if (g_cluster && sort->collationCount() == 0) {
2646  if (offset >= rows_to_sort->rowCount()) {
2647  rows_to_sort->dropFirstN(offset);
2648  } else {
2649  rows_to_sort->keepFirstN(limit + offset);
2650  }
2651  } else {
2652  rows_to_sort->dropFirstN(offset);
2653  if (limit) {
2654  rows_to_sort->keepFirstN(limit);
2655  }
2656  }
2657  }
2658  return {rows_to_sort, source_result.getTargetsMeta()};
2659  };
2660 
2661  try {
2662  return execute_sort_query();
2663  } catch (const SpeculativeTopNFailed& e) {
2664  CHECK_EQ(size_t(1), groupby_exprs.size());
2665  CHECK(groupby_exprs.front());
2666  speculative_topn_blacklist_.add(groupby_exprs.front(), is_desc);
2667  return execute_sort_query();
2668  }
2669 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
size_t getOffset() const
static SpeculativeTopNBlacklist speculative_topn_blacklist_
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
void check_sort_node_source_constraint(const RelSort *sort)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
unsigned getId() const
const std::shared_ptr< ResultSet > & getRows() const
const RelAlgNode * getInput(const size_t idx) const
bool node_is_aggregate(const RelAlgNode *ra)
bool isEmptyResult() const
size_t collationCount() const
size_t getLimit() const
#define CHECK(condition)
Definition: Logger.h:206
#define DEBUG_TIMER(name)
Definition: Logger.h:322
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
std::list< Analyzer::OrderEntry > get_order_entries(const RelSort *sort)
bool g_cluster
void add(const std::shared_ptr< Analyzer::Expr > expr, const bool desc)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
Executor * executor_
WorkUnit createSortInputWorkUnit(const RelSort *, const ExecutionOptions &eo)
bool first_oe_is_desc(const std::list< Analyzer::OrderEntry > &order_entries)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeTableFunction ( const RelTableFunction table_func,
const CompilationOptions co_in,
const ExecutionOptions eo,
const int64_t  queue_time_ms 
)
private

Definition at line 1807 of file RelAlgExecutor.cpp.

References cat_, CHECK, createTableFunctionWorkUnit(), DEBUG_TIMER, CompilationOptions::device_type, Executor::ERR_OUT_OF_GPU_MEM, executor_, g_cluster, g_enable_table_functions, get_table_infos(), QueryExecutionError::getErrorCode(), GPU, handlePersistentError(), INJECT_TIMER, ExecutionOptions::just_explain, and run_benchmark_import::result.

Referenced by executeRelAlgStep().

1810  {
1812  auto timer = DEBUG_TIMER(__func__);
1813 
1814  auto co = co_in;
1815 
1816  if (g_cluster) {
1817  throw std::runtime_error("Table functions not supported in distributed mode yet");
1818  }
1819  if (!g_enable_table_functions) {
1820  throw std::runtime_error("Table function support is disabled");
1821  }
1822  auto table_func_work_unit = createTableFunctionWorkUnit(
1823  table_func,
1824  eo.just_explain,
1825  /*is_gpu = */ co.device_type == ExecutorDeviceType::GPU);
1826  const auto body = table_func_work_unit.body;
1827  CHECK(body);
1828 
1829  const auto table_infos =
1830  get_table_infos(table_func_work_unit.exe_unit.input_descs, executor_);
1831 
1832  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1833  co.device_type,
1835  nullptr,
1836  executor_->getCatalog(),
1837  executor_->blockSize(),
1838  executor_->gridSize()),
1839  {}};
1840 
1841  try {
1842  result = {executor_->executeTableFunction(
1843  table_func_work_unit.exe_unit, table_infos, co, eo, cat_),
1844  body->getOutputMetainfo()};
1845  } catch (const QueryExecutionError& e) {
1848  throw std::runtime_error("Table function ran out of memory during execution");
1849  }
1850  result.setQueueTime(queue_time_ms);
1851  return result;
1852 }
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const Catalog_Namespace::Catalog & cat_
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1114
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
static void handlePersistentError(const int32_t error_code)
#define CHECK(condition)
Definition: Logger.h:206
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:322
bool g_cluster
Executor * executor_
bool g_enable_table_functions
Definition: Execute.cpp:105

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeUnion ( const RelLogicalUnion logical_union,
const RaExecutionSequence seq,
const CompilationOptions co,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms 
)
private

Definition at line 2019 of file RelAlgExecutor.cpp.

References RelLogicalUnion::checkForMatchingMetaInfoTypes(), createUnionWorkUnit(), DEBUG_TIMER, Default, executeWorkUnit(), RelAlgNode::getInput(), RelAlgNode::getOutputMetainfo(), RelAlgNode::hasInput(), RelLogicalUnion::isAll(), isGeometry(), CompilationOptions::makeCpuOnly(), query_dag_, and RelAlgNode::setOutputMetainfo().

Referenced by executeRelAlgStep().

2024  {
2025  auto timer = DEBUG_TIMER(__func__);
2026  if (!logical_union->isAll()) {
2027  throw std::runtime_error("UNION without ALL is not supported yet.");
2028  }
2029  // Will throw a std::runtime_error if types don't match.
2030  logical_union->checkForMatchingMetaInfoTypes();
2031  logical_union->setOutputMetainfo(logical_union->getInput(0)->getOutputMetainfo());
2032  if (boost::algorithm::any_of(logical_union->getOutputMetainfo(), isGeometry)) {
2033  throw std::runtime_error("UNION does not support subqueries with geo-columns.");
2034  }
2035  // Only Projections and Aggregates from a UNION are supported for now.
2036  query_dag_->eachNode([logical_union](RelAlgNode const* node) {
2037  if (node->hasInput(logical_union) &&
2038  !shared::dynamic_castable_to_any<RelProject, RelLogicalUnion, RelAggregate>(
2039  node)) {
2040  throw std::runtime_error("UNION ALL not yet supported in this context.");
2041  }
2042  });
2043 
2044  auto work_unit =
2045  createUnionWorkUnit(logical_union, {{}, SortAlgorithm::Default, 0, 0}, eo);
2046  return executeWorkUnit(work_unit,
2047  logical_union->getOutputMetainfo(),
2048  false,
2050  eo,
2051  render_info,
2052  queue_time_ms);
2053 }
bool isAll() const
void checkForMatchingMetaInfoTypes() const
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
bool isGeometry(TargetMetaInfo const &target_meta_info)
const RelAlgNode * getInput(const size_t idx) const
WorkUnit createUnionWorkUnit(const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
bool hasInput(const RelAlgNode *needle) const
#define DEBUG_TIMER(name)
Definition: Logger.h:322
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
std::unique_ptr< RelAlgDagBuilder > query_dag_
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
const std::vector< TargetMetaInfo > & getOutputMetainfo() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RelAlgExecutor::executeUpdate ( const RelAlgNode node,
const CompilationOptions co,
const ExecutionOptions eo,
const int64_t  queue_time_ms 
)
private

Definition at line 1498 of file RelAlgExecutor.cpp.

References CompilationOptions::allow_lazy_fetch, cat_, CHECK, CHECK_EQ, createCompoundWorkUnit(), createProjectWorkUnit(), DEBUG_TIMER, Default, dml_transaction_parameters_, executor_, CompilationOptions::filter_on_deleted_column, get_table_infos(), get_temporary_table(), QueryExecutionError::getErrorCode(), getErrorMessageFromCode(), Catalog_Namespace::Catalog::getMetadataForColumn(), CompilationOptions::hoist_literals, CacheInvalidator< CACHE_HOLDING_TYPES >::invalidateCaches(), CompilationOptions::makeCpuOnly(), ExecutionOptions::output_columnar_hint, post_execution_callback_, temporary_tables_, and StorageIOFacility::yieldUpdateCallback().

Referenced by executeRelAlgStep().

1501  {
1502  CHECK(node);
1503  auto timer = DEBUG_TIMER(__func__);
1504 
1506 
1507  auto co = co_in;
1508  co.hoist_literals = false; // disable literal hoisting as it interferes with dict
1509  // encoded string updates
1510 
1511  auto execute_update_for_node = [this, &co, &eo_in](const auto node,
1512  auto& work_unit,
1513  const bool is_aggregate) {
1514  auto table_descriptor = node->getModifiedTableDescriptor();
1515  CHECK(table_descriptor);
1516  if (node->isVarlenUpdateRequired() && !table_descriptor->hasDeletedCol) {
1517  throw std::runtime_error(
1518  "UPDATE queries involving variable length columns are only supported on tables "
1519  "with the vacuum attribute set to 'delayed'");
1520  }
1521 
1523  std::make_unique<UpdateTransactionParameters>(node->getModifiedTableDescriptor(),
1524  node->getTargetColumns(),
1525  node->getOutputMetainfo(),
1526  node->isVarlenUpdateRequired());
1527 
1528  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
1529 
1530  auto execute_update_ra_exe_unit = [this, &co, &eo_in, &table_infos](
1531  const RelAlgExecutionUnit& ra_exe_unit,
1532  const bool is_aggregate) {
1534 
1535  auto eo = eo_in;
1536  if (dml_transaction_parameters_->tableIsTemporary()) {
1537  eo.output_columnar_hint = true;
1538  co_project.allow_lazy_fetch = false;
1539  co_project.filter_on_deleted_column =
1540  false; // project the entire delete column for columnar update
1541  }
1542 
1543  auto update_transaction_parameters =
1544  dynamic_cast<UpdateTransactionParameters*>(dml_transaction_parameters_.get());
1545  CHECK(update_transaction_parameters);
1546  auto update_callback = yieldUpdateCallback(*update_transaction_parameters);
1547  try {
1548  auto table_update_metadata =
1549  executor_->executeUpdate(ra_exe_unit,
1550  table_infos,
1551  co_project,
1552  eo,
1553  cat_,
1554  executor_->row_set_mem_owner_,
1555  update_callback,
1556  is_aggregate);
1557  post_execution_callback_ = [table_update_metadata, this]() {
1558  dml_transaction_parameters_->finalizeTransaction(cat_);
1559  TableOptimizer table_optimizer{
1560  dml_transaction_parameters_->getTableDescriptor(), executor_, cat_};
1561  table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata);
1562  };
1563  } catch (const QueryExecutionError& e) {
1564  throw std::runtime_error(getErrorMessageFromCode(e.getErrorCode()));
1565  }
1566  };
1567 
1568  if (dml_transaction_parameters_->tableIsTemporary()) {
1569  // hold owned target exprs during execution if rewriting
1570  auto query_rewrite = std::make_unique<QueryRewriter>(table_infos, executor_);
1571  // rewrite temp table updates to generate the full column by moving the where
1572  // clause into a case if such a rewrite is not possible, bail on the update
1573  // operation build an expr for the update target
1574  auto update_transaction_params =
1575  dynamic_cast<UpdateTransactionParameters*>(dml_transaction_parameters_.get());
1576  CHECK(update_transaction_params);
1577  const auto td = update_transaction_params->getTableDescriptor();
1578  CHECK(td);
1579  const auto update_column_names = update_transaction_params->getUpdateColumnNames();
1580  if (update_column_names.size() > 1) {
1581  throw std::runtime_error(
1582  "Multi-column update is not yet supported for temporary tables.");
1583  }
1584 
1585  auto cd = cat_.getMetadataForColumn(td->tableId, update_column_names.front());
1586  CHECK(cd);
1587  auto projected_column_to_update =
1588  makeExpr<Analyzer::ColumnVar>(cd->columnType, td->tableId, cd->columnId, 0);
1589  const auto rewritten_exe_unit = query_rewrite->rewriteColumnarUpdate(
1590  work_unit.exe_unit, projected_column_to_update);
1591  if (rewritten_exe_unit.target_exprs.front()->get_type_info().is_varlen()) {
1592  throw std::runtime_error(
1593  "Variable length updates not yet supported on temporary tables.");
1594  }
1595  execute_update_ra_exe_unit(rewritten_exe_unit, is_aggregate);
1596  } else {
1597  execute_update_ra_exe_unit(work_unit.exe_unit, is_aggregate);
1598  }
1599  };
1600 
1601  if (auto compound = dynamic_cast<const RelCompound*>(node)) {
1602  auto work_unit =
1603  createCompoundWorkUnit(compound, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1604 
1605  execute_update_for_node(compound, work_unit, compound->isAggregate());
1606  } else if (auto project = dynamic_cast<const RelProject*>(node)) {
1607  auto work_unit =
1608  createProjectWorkUnit(project, {{}, SortAlgorithm::Default, 0, 0}, eo_in);
1609 
1610  if (project->isSimple()) {
1611  CHECK_EQ(size_t(1), project->inputCount());
1612  const auto input_ra = project->getInput(0);
1613  if (dynamic_cast<const RelSort*>(input_ra)) {
1614  const auto& input_table =
1615  get_temporary_table(&temporary_tables_, -input_ra->getId());
1616  CHECK(input_table);
1617  work_unit.exe_unit.scan_limit = input_table->rowCount();
1618  }
1619  }
1620 
1621  execute_update_for_node(project, work_unit, false);
1622  } else {
1623  throw std::runtime_error("Unsupported parent node for update: " + node->toString());
1624  }
1625 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
std::optional< std::function< void()> > post_execution_callback_
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
TemporaryTables temporary_tables_
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
static void invalidateCaches()
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:229
StorageIOFacility::UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
static CompilationOptions makeCpuOnly(const CompilationOptions &in)
const Catalog_Namespace::Catalog & cat_
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHECK(condition)
Definition: Logger.h:206
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:322
static std::string getErrorMessageFromCode(const int32_t error_code)
std::unique_ptr< TransactionParameters > dml_transaction_parameters_
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ExecutionResult RelAlgExecutor::executeWorkUnit ( const WorkUnit work_unit,
const std::vector< TargetMetaInfo > &  targets_meta,
const bool  is_agg,
const CompilationOptions co_in,
const ExecutionOptions eo,
RenderInfo render_info,
const int64_t  queue_time_ms,
const std::optional< size_t >  previous_count = std::nullopt 
)
private

Definition at line 2870 of file RelAlgExecutor.cpp.

References CompilationOptions::allow_lazy_fetch, RelAlgExecutor::WorkUnit::body, anonymous_namespace{RelAlgExecutor.cpp}::build_render_targets(), anonymous_namespace{RelAlgExecutor.cpp}::can_use_bump_allocator(), cat_, CHECK, CHECK_EQ, CHECK_GT, anonymous_namespace{RelAlgExecutor.cpp}::compute_output_buffer_size(), computeWindow(), CPU, DEBUG_TIMER, anonymous_namespace{RelAlgExecutor.cpp}::decide_approx_count_distinct_implementation(), RegisteredQueryHint::defaults(), CompilationOptions::device_type, RelAlgExecutor::WorkUnit::exe_unit, anonymous_namespace{RelAlgExecutor.cpp}::exe_unit_has_quals(), executor_, ExecutionOptions::executor_type, Extern, ExecutionOptions::find_push_down_candidates, g_big_group_threshold, g_enable_window_functions, get_table_infos(), QueryExecutionError::getErrorCode(), getFilteredCountAll(), getNDVEstimation(), anonymous_namespace{RelAlgExecutor.cpp}::groups_approx_upper_bound(), handleOutOfMemoryRetry(), handlePersistentError(), INJECT_TIMER, anonymous_namespace{RelAlgExecutor.cpp}::is_agg(), anonymous_namespace{RelAlgExecutor.cpp}::is_window_execution_unit(), RenderInfo::isPotentialInSituRender(), isRowidLookup(), ExecutionOptions::just_calcite_explain, ExecutionOptions::just_explain, ExecutionOptions::just_validate, leaf_results_, RelAlgExecutor::WorkUnit::max_groups_buffer_entry_guess, query_dag_, ra_exec_unit_desc_for_caching(), CardinalityEstimationRequired::range(), run_benchmark_import::result, selectFiltersToBePushedDown(), RelAlgExecutionUnit::target_exprs, target_exprs_owned_, VLOG, and QueryExecutionError::wasMultifragKernelLaunch().

Referenced by executeAggregate(), executeCompound(), executeFilter(), executeProject(), executeSort(), and executeUnion().

2878  {
2880  auto timer = DEBUG_TIMER(__func__);
2881 
2882  auto co = co_in;
2883  ColumnCacheMap column_cache;
2884  if (is_window_execution_unit(work_unit.exe_unit)) {
2886  throw std::runtime_error("Window functions support is disabled");
2887  }
2888  co.device_type = ExecutorDeviceType::CPU;
2889  co.allow_lazy_fetch = false;
2890  computeWindow(work_unit.exe_unit, co, eo, column_cache, queue_time_ms);
2891  }
2892  if (!eo.just_explain && eo.find_push_down_candidates) {
2893  // find potential candidates:
2894  auto selected_filters = selectFiltersToBePushedDown(work_unit, co, eo);
2895  if (!selected_filters.empty() || eo.just_calcite_explain) {
2896  return ExecutionResult(selected_filters, eo.find_push_down_candidates);
2897  }
2898  }
2899  if (render_info && render_info->isPotentialInSituRender()) {
2900  co.allow_lazy_fetch = false;
2901  }
2902  const auto body = work_unit.body;
2903  CHECK(body);
2904  auto it = leaf_results_.find(body->getId());
2905  VLOG(3) << "body->getId()=" << body->getId() << " body->toString()=" << body->toString()
2906  << " it==leaf_results_.end()=" << (it == leaf_results_.end());
2907  if (it != leaf_results_.end()) {
2908  executor_->addTransientStringLiterals(work_unit.exe_unit,
2909  executor_->row_set_mem_owner_);
2910  auto& aggregated_result = it->second;
2911  auto& result_rows = aggregated_result.rs;
2912  ExecutionResult result(result_rows, aggregated_result.targets_meta);
2913  body->setOutputMetainfo(aggregated_result.targets_meta);
2914  if (render_info) {
2915  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
2916  }
2917  return result;
2918  }
2919  const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
2920 
2922  work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);
2923 
2924  // register query hint if query_dag_ is valid
2925  ra_exe_unit.query_hint =
2926  query_dag_ ? query_dag_->getQueryHints() : RegisteredQueryHint::defaults();
2927 
2928  auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
2929  if (is_window_execution_unit(ra_exe_unit)) {
2930  CHECK_EQ(table_infos.size(), size_t(1));
2931  CHECK_EQ(table_infos.front().info.fragments.size(), size_t(1));
2932  max_groups_buffer_entry_guess =
2933  table_infos.front().info.fragments.front().getNumTuples();
2934  ra_exe_unit.scan_limit = max_groups_buffer_entry_guess;
2935  } else if (compute_output_buffer_size(ra_exe_unit) && !isRowidLookup(work_unit)) {
2936  if (previous_count && !exe_unit_has_quals(ra_exe_unit)) {
2937  ra_exe_unit.scan_limit = *previous_count;
2938  } else {
2939  // TODO(adb): enable bump allocator path for render queries
2940  if (can_use_bump_allocator(ra_exe_unit, co, eo) && !render_info) {
2941  ra_exe_unit.scan_limit = 0;
2942  ra_exe_unit.use_bump_allocator = true;
2943  } else if (eo.executor_type == ::ExecutorType::Extern) {
2944  ra_exe_unit.scan_limit = 0;
2945  } else if (!eo.just_explain) {
2946  const auto filter_count_all = getFilteredCountAll(work_unit, true, co, eo);
2947  if (filter_count_all) {
2948  ra_exe_unit.scan_limit = std::max(*filter_count_all, size_t(1));
2949  }
2950  }
2951  }
2952  }
2953  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2954  co.device_type,
2956  nullptr,
2957  executor_->getCatalog(),
2958  executor_->blockSize(),
2959  executor_->gridSize()),
2960  {}};
2961 
2962  auto execute_and_handle_errors = [&](const auto max_groups_buffer_entry_guess_in,
2963  const bool has_cardinality_estimation,
2964  const bool has_ndv_estimation) -> ExecutionResult {
2965  // Note that the groups buffer entry guess may be modified during query execution.
2966  // Create a local copy so we can track those changes if we need to attempt a retry
2967  // due to OOM
2968  auto local_groups_buffer_entry_guess = max_groups_buffer_entry_guess_in;
2969  try {
2970  return {executor_->executeWorkUnit(local_groups_buffer_entry_guess,
2971  is_agg,
2972  table_infos,
2973  ra_exe_unit,
2974  co,
2975  eo,
2976  cat_,
2977  render_info,
2978  has_cardinality_estimation,
2979  column_cache),
2980  targets_meta};
2981  } catch (const QueryExecutionError& e) {
2982  if (!has_ndv_estimation && e.getErrorCode() < 0) {
2983  throw CardinalityEstimationRequired(/*range=*/0);
2984  }
2986  return handleOutOfMemoryRetry(
2987  {ra_exe_unit, work_unit.body, local_groups_buffer_entry_guess},
2988  targets_meta,
2989  is_agg,
2990  co,
2991  eo,
2992  render_info,
2994  queue_time_ms);
2995  }
2996  };
2997 
2998  auto cache_key = ra_exec_unit_desc_for_caching(ra_exe_unit);
2999  try {
3000  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3001  auto card = cached_cardinality.second;
3002  if (cached_cardinality.first && card >= 0) {
3003  result = execute_and_handle_errors(
3004  card, /*has_cardinality_estimation=*/true, /*has_ndv_estimation=*/false);
3005  } else {
3006  result = execute_and_handle_errors(
3007  max_groups_buffer_entry_guess,
3009  /*has_ndv_estimation=*/false);
3010  }
3011  } catch (const CardinalityEstimationRequired& e) {
3012  // check the cardinality cache
3013  auto cached_cardinality = executor_->getCachedCardinality(cache_key);
3014  auto card = cached_cardinality.second;
3015  if (cached_cardinality.first && card >= 0) {
3016  result = execute_and_handle_errors(card, true, /*has_ndv_estimation=*/true);
3017  } else {
3018  const auto ndv_groups_estimation =
3019  getNDVEstimation(work_unit, e.range(), is_agg, co, eo);
3020  const auto estimated_groups_buffer_entry_guess =
3021  ndv_groups_estimation > 0 ? 2 * ndv_groups_estimation
3022  : 2 * groups_approx_upper_bound(table_infos);
3023  CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0));
3024  result = execute_and_handle_errors(
3025  estimated_groups_buffer_entry_guess, true, /*has_ndv_estimation=*/true);
3026  if (!(eo.just_validate || eo.just_explain)) {
3027  executor_->addToCardinalityCache(cache_key, estimated_groups_buffer_entry_guess);
3028  }
3029  }
3030  }
3031 
3032  result.setQueueTime(queue_time_ms);
3033  if (render_info) {
3034  build_render_targets(*render_info, work_unit.exe_unit.target_exprs, targets_meta);
3035  if (render_info->isPotentialInSituRender()) {
3036  // return an empty result (with the same queue time, and zero render time)
3037  return {std::make_shared<ResultSet>(
3038  queue_time_ms,
3039  0,
3040  executor_->row_set_mem_owner_
3041  ? executor_->row_set_mem_owner_->cloneStrDictDataOnly()
3042  : nullptr),
3043  {}};
3044  }
3045  }
3046  return result;
3047 }
bool is_agg(const Analyzer::Expr *expr)
#define CHECK_EQ(x, y)
Definition: Logger.h:214
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
bool can_use_bump_allocator(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo)
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:1236
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
std::unordered_map< unsigned, AggregatedResult > leaf_results_
#define CHECK_GT(x, y)
Definition: Logger.h:218
bool is_window_execution_unit(const RelAlgExecutionUnit &ra_exe_unit)
void computeWindow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
ExecutorType executor_type
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const Catalog_Namespace::Catalog & cat_
size_t g_big_group_threshold
Definition: Execute.cpp:103
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
size_t groups_approx_upper_bound(const std::vector< InputTableInfo > &table_infos)
size_t getNDVEstimation(const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
void build_render_targets(RenderInfo &render_info, const std::vector< Analyzer::Expr * > &work_unit_target_exprs, const std::vector< TargetMetaInfo > &targets_meta)
std::optional< size_t > getFilteredCountAll(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
bool g_enable_window_functions
Definition: Execute.cpp:104
static RegisteredQueryHint defaults()
Definition: QueryHint.h:175
bool isRowidLookup(const WorkUnit &work_unit)
bool exe_unit_has_quals(const RelAlgExecutionUnit ra_exe_unit)
static void handlePersistentError(const int32_t error_code)
bool compute_output_buffer_size(const RelAlgExecutionUnit &ra_exe_unit)
bool wasMultifragKernelLaunch() const
Definition: ErrorHandling.h:57
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
#define CHECK(condition)
Definition: Logger.h:206
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:322
std::unique_ptr< RelAlgDagBuilder > query_dag_
RelAlgExecutionUnit decide_approx_count_distinct_implementation(const RelAlgExecutionUnit &ra_exe_unit_in, const std::vector< InputTableInfo > &table_infos, const Executor *executor, const ExecutorDeviceType device_type_in, std::vector< std::shared_ptr< Analyzer::Expr >> &target_exprs_owned)
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
Executor * executor_
#define VLOG(n)
Definition: Logger.h:300

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string RelAlgExecutor::getErrorMessageFromCode ( const int32_t  error_code)
static

Definition at line 3337 of file RelAlgExecutor.cpp.

References anonymous_namespace{RelAlgExecutor.cpp}::getErrorDescription(), and to_string().

Referenced by executeDelete(), executeUpdate(), getNDVEstimation(), and handlePersistentError().

3337  {
3338  if (error_code < 0) {
3339  return "Ran out of slots in the query output buffer";
3340  }
3341  const auto errorInfo = getErrorDescription(error_code);
3342 
3343  if (errorInfo.code) {
3344  return errorInfo.code + ": "s + errorInfo.description;
3345  } else {
3346  return "Other error: code "s + std::to_string(error_code);
3347  }
3348 }
ErrorInfo getErrorDescription(const int32_t error_code)
std::string to_string(char const *&&v)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Executor * RelAlgExecutor::getExecutor ( ) const

Definition at line 479 of file RelAlgExecutor.cpp.

References executor_.

479  {
480  return executor_;
481 }
Executor * executor_
std::optional< size_t > RelAlgExecutor::getFilteredCountAll ( const WorkUnit work_unit,
const bool  is_agg,
const CompilationOptions co,
const ExecutionOptions eo 
)
private

Definition at line 3049 of file RelAlgExecutor.cpp.

References cat_, CHECK, CHECK_EQ, CHECK_GE, count, create_count_all_execution_unit(), RelAlgExecutor::WorkUnit::exe_unit, executor_, g_bigint_count, get_table_infos(), kBIGINT, kCOUNT, kINT, LOG, and logger::WARNING.

Referenced by executeWorkUnit().

3052  {
3053  const auto count =
3054  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
3055  kCOUNT,
3056  nullptr,
3057  false,
3058  nullptr);
3059  const auto count_all_exe_unit =
3060  create_count_all_execution_unit(work_unit.exe_unit, count);
3061  size_t one{1};
3062  ResultSetPtr count_all_result;
3063  try {
3064  ColumnCacheMap column_cache;
3065  count_all_result =
3066  executor_->executeWorkUnit(one,
3067  is_agg,
3068  get_table_infos(work_unit.exe_unit, executor_),
3069  count_all_exe_unit,
3070  co,
3071  eo,
3072  cat_,
3073  nullptr,
3074  false,
3075  column_cache);
3076  } catch (const foreign_storage::ForeignStorageException& error) {
3077  throw error;
3078  } catch (const QueryMustRunOnCpu&) {
3079  // force a retry of the top level query on CPU
3080  throw;
3081  } catch (const std::exception& e) {
3082  LOG(WARNING) << "Failed to run pre-flight filtered count with error " << e.what();
3083  return std::nullopt;
3084  }
3085  const auto count_row = count_all_result->getNextRow(false, false);
3086  CHECK_EQ(size_t(1), count_row.size());
3087  const auto& count_tv = count_row.front();
3088  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
3089  CHECK(count_scalar_tv);
3090  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
3091  CHECK(count_ptr);
3092  CHECK_GE(*count_ptr, 0);
3093  auto count_upper_bound = static_cast<size_t>(*count_ptr);
3094  return std::max(count_upper_bound, size_t(1));
3095 }
bool is_agg(const Analyzer::Expr *expr)
#define CHECK_EQ(x, y)
Definition: Logger.h:214
#define LOG(tag)
Definition: Logger.h:200
#define CHECK_GE(x, y)
Definition: Logger.h:219
std::shared_ptr< ResultSet > ResultSetPtr
RelAlgExecutionUnit create_count_all_execution_unit(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< Analyzer::Expr > replacement_target)
int count
const Catalog_Namespace::Catalog & cat_
bool g_bigint_count
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
Definition: sqldefs.h:76
#define CHECK(condition)
Definition: Logger.h:206
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqltypes.h:44
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function: