OmniSciDB  a667adc9c8
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Executor Class Reference

#include <Execute.h>

+ Collaboration diagram for Executor:

Classes

struct  ExecutorMutexHolder
 
class  FetchCacheAnchor
 
struct  GroupColLLVMValue
 
struct  JoinHashTableOrError
 

Public Types

using ExecutorId = size_t
 
using CachedCardinality = std::pair< bool, size_t >
 

Public Member Functions

 Executor (const ExecutorId id, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
 
const TemporaryTablesgetTemporaryTables ()
 
StringDictionaryProxygetStringDictionaryProxy (const int dict_id, const bool with_generation) const
 
StringDictionaryProxygetStringDictionaryProxy (const int dictId, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
 
bool isCPUOnly () const
 
bool isArchMaxwell (const ExecutorDeviceType dt) const
 
bool containsLeftDeepOuterJoin () const
 
const ColumnDescriptorgetColumnDescriptor (const Analyzer::ColumnVar *) const
 
const ColumnDescriptorgetPhysicalColumnDescriptor (const Analyzer::ColumnVar *, int) const
 
const Catalog_Namespace::CataloggetCatalog () const
 
void setCatalog (const Catalog_Namespace::Catalog *catalog)
 
const std::shared_ptr
< RowSetMemoryOwner
getRowSetMemoryOwner () const
 
const TemporaryTablesgetTemporaryTables () const
 
Fragmenter_Namespace::TableInfo getTableInfo (const int table_id) const
 
const TableGenerationgetTableGeneration (const int table_id) const
 
ExpressionRange getColRange (const PhysicalInput &) const
 
size_t getNumBytesForFetchedRow (const std::set< int > &table_ids_to_fetch) const
 
std::vector< ColumnLazyFetchInfogetColLazyFetchInfo (const std::vector< Analyzer::Expr * > &target_exprs) const
 
void registerActiveModule (void *module, const int device_id) const
 
void unregisterActiveModule (void *module, const int device_id) const
 
void interrupt (const QuerySessionId &query_session="", const QuerySessionId &interrupt_session="")
 
void resetInterrupt ()
 
void enableRuntimeQueryInterrupt (const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
 
int8_t warpSize () const
 
unsigned gridSize () const
 
unsigned numBlocksPerMP () const
 
unsigned blockSize () const
 
size_t maxGpuSlabSize () const
 
ResultSetPtr executeWorkUnit (size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
 
TableUpdateMetadata executeUpdate (const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const UpdateLogForFragment::Callback &cb, const bool is_agg)
 
void setupCaching (const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< int > &phys_table_ids)
 
void setColRangeCache (const AggregatedColRange &aggregated_col_range)
 
QuerySessionIdgetCurrentQuerySession (mapd_shared_lock< mapd_shared_mutex > &read_lock)
 
size_t getRunningExecutorId (mapd_shared_lock< mapd_shared_mutex > &read_lock)
 
void setCurrentQuerySession (const QuerySessionId &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
void setRunningExecutorId (const size_t id, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
bool checkCurrentQuerySession (const std::string &candidate_query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
 
void invalidateRunningQuerySession (mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
bool addToQuerySessionList (const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
bool removeFromQuerySessionList (const QuerySessionId &query_session, const std::string &submitted_time_str, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
void setQuerySessionAsInterrupted (const QuerySessionId &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
void resetQuerySessionInterruptFlag (const std::string &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
bool checkIsQuerySessionInterrupted (const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
 
bool checkIsRunningQuerySessionInterrupted ()
 
bool checkIsQuerySessionEnrolled (const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
 
bool updateQuerySessionStatusWithLock (const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
bool updateQuerySessionExecutorAssignment (const QuerySessionId &query_session, const std::string &submitted_time_str, const size_t executor_id, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
std::vector< QuerySessionStatusgetQuerySessionInfo (const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
 
mapd_shared_mutexgetSessionLock ()
 
CurrentQueryStatus attachExecutorToQuerySession (const QuerySessionId &query_session_id, const std::string &query_str, const std::string &query_submitted_time)
 
void checkPendingQueryStatus (const QuerySessionId &query_session)
 
void clearQuerySessionStatus (const QuerySessionId &query_session, const std::string &submitted_time_str, const bool acquire_spin_lock)
 
void updateQuerySessionStatus (std::shared_ptr< const query_state::QueryState > &query_state, const QuerySessionStatus::QueryStatus new_query_status)
 
void updateQuerySessionStatus (const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus new_query_status)
 
void enrollQuerySession (const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted_time_str, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
 
void addToCardinalityCache (const std::string &cache_key, const size_t cache_value)
 
CachedCardinality getCachedCardinality (const std::string &cache_key)
 
template<typename THREAD_POOL >
void launchKernels (SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels)
 

Static Public Member Functions

static std::shared_ptr< ExecutorgetExecutor (const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
 
static void nukeCacheOfExecutors ()
 
static void clearMemory (const Data_Namespace::MemoryLevel memory_level)
 
static size_t getArenaBlockSize ()
 
static std::pair< int64_t,
int32_t > 
reduceResults (const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
 
static void addCodeToCache (const CodeCacheKey &, std::shared_ptr< CompilationContext >, llvm::Module *, CodeCache &)
 

Static Public Attributes

static const ExecutorId UNITARY_EXECUTOR_ID = 0
 
static const size_t high_scan_limit
 
static const int32_t ERR_DIV_BY_ZERO {1}
 
static const int32_t ERR_OUT_OF_GPU_MEM {2}
 
static const int32_t ERR_OUT_OF_SLOTS {3}
 
static const int32_t ERR_UNSUPPORTED_SELF_JOIN {4}
 
static const int32_t ERR_OUT_OF_RENDER_MEM {5}
 
static const int32_t ERR_OUT_OF_CPU_MEM {6}
 
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW {7}
 
static const int32_t ERR_OUT_OF_TIME {9}
 
static const int32_t ERR_INTERRUPTED {10}
 
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED {11}
 
static const int32_t ERR_TOO_MANY_LITERALS {12}
 
static const int32_t ERR_STRING_CONST_IN_RESULTSET {13}
 
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY {14}
 
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES {15}
 
static const int32_t ERR_GEOS {16}
 
static std::mutex compilation_mutex_
 
static std::mutex kernel_mutex_
 

Private Types

using PerFragmentCallBack = std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)>
 

Private Member Functions

void clearMetaInfoCache ()
 
int deviceCount (const ExecutorDeviceType) const
 
int deviceCountForMemoryLevel (const Data_Namespace::MemoryLevel memory_level) const
 
llvm::Value * codegenWindowFunction (const size_t target_index, const CompilationOptions &co)
 
llvm::Value * codegenWindowFunctionAggregate (const CompilationOptions &co)
 
llvm::BasicBlock * codegenWindowResetStateControlFlow ()
 
void codegenWindowFunctionStateInit (llvm::Value *aggregate_state)
 
llvm::Value * codegenWindowFunctionAggregateCalls (llvm::Value *aggregate_state, const CompilationOptions &co)
 
void codegenWindowAvgEpilogue (llvm::Value *crt_val, llvm::Value *window_func_null_val, llvm::Value *multiplicity_lv)
 
llvm::Value * codegenAggregateWindowState ()
 
llvm::Value * aggregateWindowStatePtr ()
 
bool isArchPascalOrLater (const ExecutorDeviceType dt) const
 
bool needFetchAllFragments (const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
 
bool needLinearizeAllFragments (const ColumnDescriptor *cd, const InputColDescriptor &inner_col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments, const Data_Namespace::MemoryLevel memory_level) const
 
void executeWorkUnitPerFragment (const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
 Compiles and dispatches a work unit per fragment processing results with the per fragment callback. Currently used for computing metrics over fragments (metadata). More...
 
ResultSetPtr executeExplain (const QueryCompilationDescriptor &)
 
ResultSetPtr executeTableFunction (const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
 Compiles and dispatches a table function; that is, a function that takes as input one or more columns and returns a ResultSet, which can be parsed by subsequent execution steps. More...
 
ExecutorDeviceType getDeviceTypeForTargets (const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
 
ResultSetPtr collectAllDeviceResults (SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
 
ResultSetPtr collectAllDeviceShardedTopResults (SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit) const
 
std::unordered_map< int, const
Analyzer::BinOper * > 
getInnerTabIdToJoinCond () const
 
std::vector< std::unique_ptr
< ExecutionKernel > > 
createKernels (SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
 
template<typename THREAD_POOL >
void launchKernels (SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels)
 
std::vector< size_t > getTableFragmentIndices (const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments * > &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
 
bool skipFragmentPair (const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
 
FetchResult fetchChunks (const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
 
FetchResult fetchUnionChunks (const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
 
std::pair< std::vector
< std::vector< int64_t >
>, std::vector< std::vector
< uint64_t > > > 
getRowCountAndOffsetForAllFrags (const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
 
void buildSelectedFragsMapping (std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
 
void buildSelectedFragsMappingForUnion (std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
 
std::vector< size_t > getFragmentCount (const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
 
int32_t executePlanWithGroupBy (const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info)
 
int32_t executePlanWithoutGroupBy (const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info)
 
ResultSetPtr resultsUnion (SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit)
 
std::vector< int64_t > getJoinHashTablePtrs (const ExecutorDeviceType device_type, const int device_id)
 
ResultSetPtr reduceMultiDeviceResults (const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
 
ResultSetPtr reduceMultiDeviceResultSets (std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
 
ResultSetPtr reduceSpeculativeTopN (const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
 
ResultSetPtr executeWorkUnitImpl (size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
 
std::vector< llvm::Value * > inlineHoistedLiterals ()
 
std::tuple< CompilationResult,
std::unique_ptr
< QueryMemoryDescriptor > > 
compileWorkUnit (const std::vector< InputTableInfo > &query_infos, const PlanState::DeletedColumnsMap &deleted_cols_map, const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, const CudaMgr_Namespace::CudaMgr *cuda_mgr, const bool allow_lazy_fetch, std::shared_ptr< RowSetMemoryOwner >, const size_t max_groups_buffer_entry_count, const int8_t crt_min_byte_width, const bool has_cardinality_estimation, ColumnCacheMap &column_cache, RenderInfo *render_info=nullptr)
 
llvm::BasicBlock * codegenSkipDeletedOuterTableRow (const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
 
std::vector< JoinLoopbuildJoinLoops (RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, const std::vector< InputTableInfo > &query_infos, ColumnCacheMap &column_cache)
 
JoinLoop::HoistedFiltersCallback buildHoistLeftHandSideFiltersCb (const RelAlgExecutionUnit &ra_exe_unit, const size_t level_idx, const int inner_table_id, const CompilationOptions &co)
 
std::function< llvm::Value
*(const std::vector
< llvm::Value * >
&, llvm::Value *)> 
buildIsDeletedCb (const RelAlgExecutionUnit &ra_exe_unit, const size_t level_idx, const CompilationOptions &co)
 
std::shared_ptr< HashJoinbuildCurrentLevelHashTable (const JoinCondition &current_level_join_conditions, RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const std::vector< InputTableInfo > &query_infos, ColumnCacheMap &column_cache, std::vector< std::string > &fail_reasons)
 
void redeclareFilterFunction ()
 
llvm::Value * addJoinLoopIterator (const std::vector< llvm::Value * > &prev_iters, const size_t level_idx)
 
void codegenJoinLoops (const std::vector< JoinLoop > &join_loops, const RelAlgExecutionUnit &ra_exe_unit, GroupByAndAggregate &group_by_and_aggregate, llvm::Function *query_func, llvm::BasicBlock *entry_bb, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const ExecutionOptions &eo)
 
bool compileBody (const RelAlgExecutionUnit &ra_exe_unit, GroupByAndAggregate &group_by_and_aggregate, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const GpuSharedMemoryContext &gpu_smem_context={})
 
void createErrorCheckControlFlow (llvm::Function *query_func, bool run_with_dynamic_watchdog, bool run_with_allowing_runtime_interrupt, ExecutorDeviceType device_type, const std::vector< InputTableInfo > &input_table_infos)
 
void insertErrorCodeChecker (llvm::Function *query_func, bool hoist_literals, bool allow_runtime_query_interrupt)
 
void preloadFragOffsets (const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
 
JoinHashTableOrError buildHashTableForQualifier (const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const HashType preferred_hash_type, ColumnCacheMap &column_cache, const QueryHint &query_hint)
 
void nukeOldState (const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const PlanState::DeletedColumnsMap &deleted_cols_map, const RelAlgExecutionUnit *ra_exe_unit)
 
std::shared_ptr
< CompilationContext
optimizeAndCodegenCPU (llvm::Function *, llvm::Function *, const std::unordered_set< llvm::Function * > &, const CompilationOptions &)
 
std::shared_ptr
< CompilationContext
optimizeAndCodegenGPU (llvm::Function *, llvm::Function *, std::unordered_set< llvm::Function * > &, const bool no_inline, const CudaMgr_Namespace::CudaMgr *cuda_mgr, const CompilationOptions &)
 
std::string generatePTX (const std::string &) const
 
void initializeNVPTXBackend () const
 
int64_t deviceCycles (int milliseconds) const
 
GroupColLLVMValue groupByColumnCodegen (Analyzer::Expr *group_by_col, const size_t col_width, const CompilationOptions &, const bool translate_null_val, const int64_t translated_null_val, GroupByAndAggregate::DiamondCodegen &, std::stack< llvm::BasicBlock * > &, const bool thread_mem_shared)
 
llvm::Value * castToFP (llvm::Value *, SQLTypeInfo const &from_ti, SQLTypeInfo const &to_ti)
 
llvm::Value * castToIntPtrTyIn (llvm::Value *val, const size_t bit_width)
 
std::tuple
< RelAlgExecutionUnit,
PlanState::DeletedColumnsMap
addDeletedColumn (const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
 
bool isFragmentFullyDeleted (const int table_id, const Fragmenter_Namespace::FragmentInfo &fragment)
 
std::pair< bool, int64_t > skipFragment (const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
 
std::pair< bool, int64_t > skipFragmentInnerJoins (const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
 
AggregatedColRange computeColRangesCache (const std::unordered_set< PhysicalInput > &phys_inputs)
 
StringDictionaryGenerations computeStringDictionaryGenerations (const std::unordered_set< PhysicalInput > &phys_inputs)
 
TableGenerations computeTableGenerations (std::unordered_set< int > phys_table_ids)
 
std::shared_ptr
< CompilationContext
getCodeFromCache (const CodeCacheKey &, const CodeCache &)
 
std::vector< int8_t > serializeLiterals (const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
 
llvm::Value * spillDoubleElement (llvm::Value *elem_val, llvm::Type *elem_ty)
 
ExecutorMutexHolder acquireExecuteMutex ()
 

Static Private Member Functions

static size_t align (const size_t off_in, const size_t alignment)
 

Private Attributes

std::unique_ptr< CgenStatecgen_state_
 
std::unique_ptr< PlanStateplan_state_
 
std::shared_ptr
< RowSetMemoryOwner
row_set_mem_owner_
 
std::mutex gpu_exec_mutex_ [max_gpu_count]
 
std::mutex str_dict_mutex_
 
std::unique_ptr
< llvm::TargetMachine > 
nvptx_target_machine_
 
CodeCache cpu_code_cache_
 
CodeCache gpu_code_cache_
 
const unsigned block_size_x_
 
const unsigned grid_size_x_
 
const size_t max_gpu_slab_size_
 
const std::string debug_dir_
 
const std::string debug_file_
 
const ExecutorId executor_id_
 
const Catalog_Namespace::Catalogcatalog_
 
const TemporaryTablestemporary_tables_
 
int64_t kernel_queue_time_ms_ = 0
 
int64_t compilation_queue_time_ms_ = 0
 
std::unique_ptr
< WindowProjectNodeContext
window_project_node_context_owned_
 
WindowFunctionContextactive_window_function_ {nullptr}
 
InputTableInfoCache input_table_info_cache_
 
AggregatedColRange agg_col_range_cache_
 
TableGenerations table_generations_
 

Static Private Attributes

static const int max_gpu_count {16}
 
static std::mutex gpu_active_modules_mutex_
 
static uint32_t gpu_active_modules_device_mask_ {0x0}
 
static void * gpu_active_modules_ [max_gpu_count]
 
static std::atomic< bool > interrupted_ {false}
 
static const size_t baseline_threshold
 
static const size_t code_cache_size {1000}
 
static mapd_shared_mutex executor_session_mutex_
 
static QuerySessionId current_query_session_ {""}
 
static size_t running_query_executor_id_ {0}
 
static InterruptFlagMap queries_interrupt_flag_
 
static QuerySessionMap queries_session_map_
 
static std::map< int,
std::shared_ptr< Executor > > 
executors_
 
static std::atomic_flag execute_spin_lock_ = ATOMIC_FLAG_INIT
 
static mapd_shared_mutex execute_mutex_
 
static mapd_shared_mutex executors_cache_mutex_
 
static mapd_shared_mutex recycler_mutex_
 
static std::unordered_map
< std::string, size_t > 
cardinality_cache_
 

Friends

class BaselineJoinHashTable
 
class CodeGenerator
 
class ColumnFetcher
 
class ExecutionKernel
 
class HashJoin
 
class OverlapsJoinHashTable
 
class GroupByAndAggregate
 
class QueryCompilationDescriptor
 
class QueryMemoryDescriptor
 
class QueryMemoryInitializer
 
class QueryFragmentDescriptor
 
class QueryExecutionContext
 
class ResultSet
 
class InValuesBitmap
 
class LeafAggregator
 
class PerfectJoinHashTable
 
class QueryRewriter
 
class PendingExecutionClosure
 
class RelAlgExecutor
 
class TableOptimizer
 
class TableFunctionCompilationContext
 
class TableFunctionExecutionContext
 
struct TargetExprCodegenBuilder
 
struct TargetExprCodegen
 
class WindowProjectNodeContext
 

Detailed Description

Definition at line 359 of file Execute.h.

Member Typedef Documentation

using Executor::CachedCardinality = std::pair<bool, size_t>

Definition at line 990 of file Execute.h.

using Executor::ExecutorId = size_t

Definition at line 366 of file Execute.h.

Definition at line 538 of file Execute.h.

Constructor & Destructor Documentation

Executor::Executor ( const ExecutorId  id,
const size_t  block_size_x,
const size_t  grid_size_x,
const size_t  max_gpu_slab_size,
const std::string &  debug_dir,
const std::string &  debug_file 
)

Definition at line 142 of file Execute.cpp.

148  : cgen_state_(new CgenState({}, false))
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010

Member Function Documentation

ExecutorMutexHolder Executor::acquireExecuteMutex ( )
inlineprivate

Definition at line 1090 of file Execute.h.

References execute_mutex_, executor_id_, Executor::ExecutorMutexHolder::shared_lock, Executor::ExecutorMutexHolder::unique_lock, and UNITARY_EXECUTOR_ID.

1090  {
1091  ExecutorMutexHolder ret;
1093  // Only one unitary executor can run at a time
1094  ret.unique_lock = mapd_unique_lock<mapd_shared_mutex>(execute_mutex_);
1095  } else {
1096  ret.shared_lock = mapd_shared_lock<mapd_shared_mutex>(execute_mutex_);
1097  }
1098  return ret;
1099  }
static mapd_shared_mutex execute_mutex_
Definition: Execute.h:1084
const ExecutorId executor_id_
Definition: Execute.h:1053
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:367
void Executor::addCodeToCache ( const CodeCacheKey key,
std::shared_ptr< CompilationContext compilation_context,
llvm::Module *  module,
CodeCache cache 
)
static

Definition at line 385 of file NativeCodegen.cpp.

References LruCache< key_t, value_t, hash_t >::put().

Referenced by StubGenerator::generateStub().

388  {
389  cache.put(key,
390  std::make_pair<std::shared_ptr<CompilationContext>, decltype(module)>(
391  std::move(compilation_context), std::move(module)));
392 }
void put(const key_t &key, value_t &&value)
Definition: LruCache.hpp:27

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > Executor::addDeletedColumn ( const RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co 
)
private

Definition at line 3432 of file Execute.cpp.

References anonymous_namespace{Execute.cpp}::add_deleted_col_to_map(), catalog_(), CHECK, CompilationOptions::filter_on_deleted_column, and TABLE.

3434  {
3435  if (!co.filter_on_deleted_column) {
3436  return std::make_tuple(ra_exe_unit, PlanState::DeletedColumnsMap{});
3437  }
3438  auto ra_exe_unit_with_deleted = ra_exe_unit;
3439  PlanState::DeletedColumnsMap deleted_cols_map;
3440  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3441  if (input_table.getSourceType() != InputSourceType::TABLE) {
3442  continue;
3443  }
3444  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3445  CHECK(td);
3446  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3447  if (!deleted_cd) {
3448  continue;
3449  }
3450  CHECK(deleted_cd->columnType.is_boolean());
3451  // check deleted column is not already present
3452  bool found = false;
3453  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3454  if (input_col.get()->getColId() == deleted_cd->columnId &&
3455  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3456  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3457  found = true;
3458  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3459  break;
3460  }
3461  }
3462  if (!found) {
3463  // add deleted column
3464  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3465  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3466  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3467  }
3468  }
3469  return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
3470 }
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1054
const ColumnDescriptor * getDeletedColumnIfRowsDeleted(const TableDescriptor *td) const
Definition: Catalog.cpp:3086
std::unordered_map< TableId, const ColumnDescriptor * > DeletedColumnsMap
Definition: PlanState.h:44
#define CHECK(condition)
Definition: Logger.h:197
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
void add_deleted_col_to_map(PlanState::DeletedColumnsMap &deleted_cols_map, const ColumnDescriptor *deleted_cd)
Definition: Execute.cpp:3420

+ Here is the call graph for this function:

llvm::Value * Executor::addJoinLoopIterator ( const std::vector< llvm::Value * > &  prev_iters,
const size_t  level_idx 
)
private

Definition at line 807 of file IRCodegen.cpp.

References AUTOMATIC_IR_METADATA, CodeGenerator::cgen_state_, CHECK, and CgenState::scan_idx_to_hash_pos_.

808  {
810  // Iterators are added for loop-outer joins when the head of the loop is generated,
811  // then once again when the body if generated. Allow this instead of special handling
812  // of call sites.
813  const auto it = cgen_state_->scan_idx_to_hash_pos_.find(level_idx);
814  if (it != cgen_state_->scan_idx_to_hash_pos_.end()) {
815  return it->second;
816  }
817  CHECK(!prev_iters.empty());
818  llvm::Value* matching_row_index = prev_iters.back();
819  const auto it_ok =
820  cgen_state_->scan_idx_to_hash_pos_.emplace(level_idx, matching_row_index);
821  CHECK(it_ok.second);
822  return matching_row_index;
823 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
#define AUTOMATIC_IR_METADATA(CGENSTATE)
#define CHECK(condition)
Definition: Logger.h:197
void Executor::addToCardinalityCache ( const std::string &  cache_key,
const size_t  cache_value 
)

Definition at line 4165 of file Execute.cpp.

References g_use_estimator_result_cache, and VLOG.

4166  {
4168  mapd_unique_lock<mapd_shared_mutex> lock(recycler_mutex_);
4169  cardinality_cache_[cache_key] = cache_value;
4170  VLOG(1) << "Put estimated cardinality to the cache";
4171  }
4172 }
static std::unordered_map< std::string, size_t > cardinality_cache_
Definition: Execute.h:1106
bool g_use_estimator_result_cache
Definition: Execute.cpp:116
static mapd_shared_mutex recycler_mutex_
Definition: Execute.h:1105
#define VLOG(n)
Definition: Logger.h:291
bool Executor::addToQuerySessionList ( const QuerySessionId query_session,
const std::string &  query_str,
const std::string &  submitted,
const size_t  executor_id,
const QuerySessionStatus::QueryStatus  query_status,
mapd_unique_lock< mapd_shared_mutex > &  write_lock 
)

Definition at line 3982 of file Execute.cpp.

3987  {
3988  // an internal API that enrolls the query session into the Executor's session map
3989  if (queries_session_map_.count(query_session)) {
3990  if (queries_session_map_.at(query_session).count(submitted_time_str)) {
3991  queries_session_map_.at(query_session).erase(submitted_time_str);
3992  queries_session_map_.at(query_session)
3993  .emplace(submitted_time_str,
3994  QuerySessionStatus(query_session,
3995  executor_id,
3996  query_str,
3997  submitted_time_str,
3998  query_status));
3999  } else {
4000  queries_session_map_.at(query_session)
4001  .emplace(submitted_time_str,
4002  QuerySessionStatus(query_session,
4003  executor_id,
4004  query_str,
4005  submitted_time_str,
4006  query_status));
4007  }
4008  } else {
4009  std::map<std::string, QuerySessionStatus> executor_per_query_map;
4010  executor_per_query_map.emplace(
4011  submitted_time_str,
4013  query_session, executor_id, query_str, submitted_time_str, query_status));
4014  queries_session_map_.emplace(query_session, executor_per_query_map);
4015  }
4016  return queries_interrupt_flag_.emplace(query_session, false).second;
4017 }
static QuerySessionMap queries_session_map_
Definition: Execute.h:1077
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:1075
llvm::Value * Executor::aggregateWindowStatePtr ( )
private

Definition at line 124 of file WindowFunctionIR.cpp.

References AUTOMATIC_IR_METADATA, anonymous_namespace{WindowFunctionIR.cpp}::get_adjusted_window_type_info(), get_int_type(), WindowProjectNodeContext::getActiveWindowFunctionContext(), and kFLOAT.

124  {
126  const auto window_func_context =
128  const auto window_func = window_func_context->getWindowFunction();
129  const auto arg_ti = get_adjusted_window_type_info(window_func);
130  llvm::Type* aggregate_state_type =
131  arg_ti.get_type() == kFLOAT
132  ? llvm::PointerType::get(get_int_type(32, cgen_state_->context_), 0)
133  : llvm::PointerType::get(get_int_type(64, cgen_state_->context_), 0);
134  const auto aggregate_state_i64 = cgen_state_->llInt(
135  reinterpret_cast<const int64_t>(window_func_context->aggregateState()));
136  return cgen_state_->ir_builder_.CreateIntToPtr(aggregate_state_i64,
137  aggregate_state_type);
138 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
SQLTypeInfo get_adjusted_window_type_info(const Analyzer::WindowFunction *window_func)

+ Here is the call graph for this function:

static size_t Executor::align ( const size_t  off_in,
const size_t  alignment 
)
inlinestaticprivate

Definition at line 1002 of file Execute.h.

1002  {
1003  size_t off = off_in;
1004  if (off % alignment != 0) {
1005  off += (alignment - off % alignment);
1006  }
1007  return off;
1008  }
CurrentQueryStatus Executor::attachExecutorToQuerySession ( const QuerySessionId query_session_id,
const std::string &  query_str,
const std::string &  query_submitted_time 
)

Definition at line 3852 of file Execute.cpp.

References executor_id_().

3855  {
3856  if (!query_session_id.empty()) {
3857  // if session is valid, do update 1) the exact executor id and 2) query status
3858  mapd_unique_lock<mapd_shared_mutex> write_lock(executor_session_mutex_);
3860  query_session_id, query_submitted_time, executor_id_, write_lock);
3861  updateQuerySessionStatusWithLock(query_session_id,
3862  query_submitted_time,
3863  QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
3864  write_lock);
3865  }
3866  return {query_session_id, query_str};
3867 }
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4019
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1069
const ExecutorId executor_id_
Definition: Execute.h:1053
bool updateQuerySessionExecutorAssignment(const QuerySessionId &query_session, const std::string &submitted_time_str, const size_t executor_id, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4045
mapd_unique_lock< mapd_shared_mutex > write_lock

+ Here is the call graph for this function:

unsigned Executor::blockSize ( ) const

Definition at line 3339 of file Execute.cpp.

References block_size_x_(), catalog_(), and CHECK.

3339  {
3340  CHECK(catalog_);
3341  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3342  if (!cuda_mgr) {
3343  return 0;
3344  }
3345  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3346  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
3347 }
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:207
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:222
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1054
const unsigned block_size_x_
Definition: Execute.h:1047
#define CHECK(condition)
Definition: Logger.h:197
const std::vector< DeviceProperties > & getAllDeviceProperties() const
Definition: CudaMgr.h:120

+ Here is the call graph for this function:

std::shared_ptr< HashJoin > Executor::buildCurrentLevelHashTable ( const JoinCondition current_level_join_conditions,
RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co,
const std::vector< InputTableInfo > &  query_infos,
ColumnCacheMap column_cache,
std::vector< std::string > &  fail_reasons 
)
private

Definition at line 660 of file IRCodegen.cpp.

References anonymous_namespace{IRCodegen.cpp}::add_qualifier_to_execution_unit(), AUTOMATIC_IR_METADATA, CodeGenerator::cgen_state_, anonymous_namespace{IRCodegen.cpp}::check_valid_join_qual(), Data_Namespace::CPU_LEVEL, CompilationOptions::device_type, Executor::JoinHashTableOrError::fail_reason, GPU, Data_Namespace::GPU_LEVEL, Executor::JoinHashTableOrError::hash_table, INNER, IS_EQUIVALENCE, PlanState::join_info_, OneToOne, CodeGenerator::plan_state_, JoinCondition::quals, RelAlgExecutionUnit::query_hint, and JoinCondition::type.

666  {
668  if (current_level_join_conditions.type != JoinType::INNER &&
669  current_level_join_conditions.quals.size() > 1) {
670  fail_reasons.emplace_back("No equijoin expression found for outer join");
671  return nullptr;
672  }
673  std::shared_ptr<HashJoin> current_level_hash_table;
674  for (const auto& join_qual : current_level_join_conditions.quals) {
675  auto qual_bin_oper = std::dynamic_pointer_cast<Analyzer::BinOper>(join_qual);
676  if (!qual_bin_oper || !IS_EQUIVALENCE(qual_bin_oper->get_optype())) {
677  fail_reasons.emplace_back("No equijoin expression found");
678  if (current_level_join_conditions.type == JoinType::INNER) {
679  add_qualifier_to_execution_unit(ra_exe_unit, join_qual);
680  }
681  continue;
682  }
683  check_valid_join_qual(qual_bin_oper);
684  JoinHashTableOrError hash_table_or_error;
685  if (!current_level_hash_table) {
686  hash_table_or_error = buildHashTableForQualifier(
687  qual_bin_oper,
688  query_infos,
692  column_cache,
693  ra_exe_unit.query_hint);
694  current_level_hash_table = hash_table_or_error.hash_table;
695  }
696  if (hash_table_or_error.hash_table) {
697  plan_state_->join_info_.join_hash_tables_.push_back(hash_table_or_error.hash_table);
698  plan_state_->join_info_.equi_join_tautologies_.push_back(qual_bin_oper);
699  } else {
700  fail_reasons.push_back(hash_table_or_error.fail_reason);
701  if (current_level_join_conditions.type == JoinType::INNER) {
702  add_qualifier_to_execution_unit(ra_exe_unit, qual_bin_oper);
703  }
704  }
705  }
706  return current_level_hash_table;
707 }
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const HashType preferred_hash_type, ColumnCacheMap &column_cache, const QueryHint &query_hint)
Definition: Execute.cpp:3285
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:67
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1025
void add_qualifier_to_execution_unit(RelAlgExecutionUnit &ra_exe_unit, const std::shared_ptr< Analyzer::Expr > &qual)
Definition: IRCodegen.cpp:230
#define AUTOMATIC_IR_METADATA(CGENSTATE)
ExecutorDeviceType device_type
std::list< std::shared_ptr< Analyzer::Expr > > quals
void check_valid_join_qual(std::shared_ptr< Analyzer::BinOper > &bin_oper)
Definition: IRCodegen.cpp:260

+ Here is the call graph for this function:

Executor::JoinHashTableOrError Executor::buildHashTableForQualifier ( const std::shared_ptr< Analyzer::BinOper > &  qual_bin_oper,
const std::vector< InputTableInfo > &  query_infos,
const MemoryLevel  memory_level,
const HashType  preferred_hash_type,
ColumnCacheMap column_cache,
const QueryHint query_hint 
)
private

Definition at line 3285 of file Execute.cpp.

References g_enable_dynamic_watchdog, g_enable_overlaps_hashjoin, and HashJoin::getInstance().

3291  {
3292  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
3293  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
3294  }
3295  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3297  }
3298  try {
3299  auto tbl = HashJoin::getInstance(qual_bin_oper,
3300  query_infos,
3301  memory_level,
3302  preferred_hash_type,
3303  deviceCountForMemoryLevel(memory_level),
3304  column_cache,
3305  this,
3306  query_hint);
3307  return {tbl, ""};
3308  } catch (const HashJoinFail& e) {
3309  return {nullptr, e.what()};
3310  }
3311 }
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1117
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:96
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const QueryHint &query_hint)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:238
static std::atomic< bool > interrupted_
Definition: Execute.h:1034
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:653

+ Here is the call graph for this function:

JoinLoop::HoistedFiltersCallback Executor::buildHoistLeftHandSideFiltersCb ( const RelAlgExecutionUnit ra_exe_unit,
const size_t  level_idx,
const int  inner_table_id,
const CompilationOptions co 
)
private

Definition at line 492 of file IRCodegen.cpp.

References AUTOMATIC_IR_METADATA, CodeGenerator::cgen_state_, CHECK, CodeGenerator::codegen(), g_enable_left_join_filter_hoisting, PlanState::hoisted_filters_, RelAlgExecutionUnit::join_quals, LEFT, CgenState::llBool(), CodeGenerator::plan_state_, RelAlgExecutionUnit::quals, RelAlgExecutionUnit::simple_quals, CodeGenerator::toBool(), and VLOG.

496  {
498  return nullptr;
499  }
500 
501  const auto& current_level_join_conditions = ra_exe_unit.join_quals[level_idx];
502  if (current_level_join_conditions.type == JoinType::LEFT) {
503  const auto& condition = current_level_join_conditions.quals.front();
504  const auto bin_oper = dynamic_cast<const Analyzer::BinOper*>(condition.get());
505  CHECK(bin_oper) << condition->toString();
506  const auto rhs =
507  dynamic_cast<const Analyzer::ColumnVar*>(bin_oper->get_right_operand());
508  const auto lhs =
509  dynamic_cast<const Analyzer::ColumnVar*>(bin_oper->get_left_operand());
510  if (lhs && rhs && lhs->get_table_id() != rhs->get_table_id()) {
511  const Analyzer::ColumnVar* selected_lhs{nullptr};
512  // grab the left hand side column -- this is somewhat similar to normalize column
513  // pair, and a better solution may be to hoist that function out of the join
514  // framework and normalize columns at the top of build join loops
515  if (lhs->get_table_id() == inner_table_id) {
516  selected_lhs = rhs;
517  } else if (rhs->get_table_id() == inner_table_id) {
518  selected_lhs = lhs;
519  }
520  if (selected_lhs) {
521  std::list<std::shared_ptr<Analyzer::Expr>> hoisted_quals;
522  // get all LHS-only filters
523  auto should_hoist_qual = [&hoisted_quals, &selected_lhs](const auto& qual,
524  const int table_id) {
525  CHECK(qual);
526 
527  ExprTableIdVisitor visitor;
528  const auto table_ids = visitor.visit(qual.get());
529  if (table_ids.size() == 1 && table_ids.find(table_id) != table_ids.end()) {
530  hoisted_quals.push_back(qual);
531  }
532  };
533  for (const auto& qual : ra_exe_unit.simple_quals) {
534  should_hoist_qual(qual, selected_lhs->get_table_id());
535  }
536  for (const auto& qual : ra_exe_unit.quals) {
537  should_hoist_qual(qual, selected_lhs->get_table_id());
538  }
539 
540  // build the filters callback and return it
541  if (!hoisted_quals.empty()) {
542  return [this, hoisted_quals, co](llvm::BasicBlock* true_bb,
543  llvm::BasicBlock* exit_bb,
544  const std::string& loop_name,
545  llvm::Function* parent_func,
546  CgenState* cgen_state) -> llvm::BasicBlock* {
547  // make sure we have quals to hoist
548  bool has_quals_to_hoist = false;
549  for (const auto& qual : hoisted_quals) {
550  // check to see if the filter was previously hoisted. if all filters were
551  // previously hoisted, this callback becomes a noop
552  if (plan_state_->hoisted_filters_.count(qual) == 0) {
553  has_quals_to_hoist = true;
554  break;
555  }
556  }
557 
558  if (!has_quals_to_hoist) {
559  return nullptr;
560  }
561 
562  AUTOMATIC_IR_METADATA(cgen_state);
563 
564  llvm::IRBuilder<>& builder = cgen_state->ir_builder_;
565  auto& context = builder.getContext();
566 
567  const auto filter_bb =
568  llvm::BasicBlock::Create(context,
569  "hoisted_left_join_filters_" + loop_name,
570  parent_func,
571  /*insert_before=*/true_bb);
572  builder.SetInsertPoint(filter_bb);
573 
574  llvm::Value* filter_lv = cgen_state_->llBool(true);
575  CodeGenerator code_generator(this);
577  for (const auto& qual : hoisted_quals) {
578  if (plan_state_->hoisted_filters_.insert(qual).second) {
579  // qual was inserted into the hoisted filters map, which means we have not
580  // seen this qual before. Generate filter.
581  VLOG(1) << "Generating code for hoisted left hand side qualifier "
582  << qual->toString();
583  auto cond = code_generator.toBool(
584  code_generator.codegen(qual.get(), true, co).front());
585  filter_lv = builder.CreateAnd(filter_lv, cond);
586  }
587  }
588  CHECK(filter_lv->getType()->isIntegerTy(1));
589 
590  builder.CreateCondBr(filter_lv, true_bb, exit_bb);
591  return filter_bb;
592  };
593  }
594  }
595  }
596  }
597  return nullptr;
598 }
bool g_enable_left_join_filter_hoisting
Definition: Execute.cpp:94
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
const JoinQualsPerNestingLevel join_quals
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1025
#define AUTOMATIC_IR_METADATA(CGENSTATE)
std::list< std::shared_ptr< Analyzer::Expr > > quals
#define CHECK(condition)
Definition: Logger.h:197
#define VLOG(n)
Definition: Logger.h:291
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals

+ Here is the call graph for this function:

std::function< llvm::Value *(const std::vector< llvm::Value * > &, llvm::Value *)> Executor::buildIsDeletedCb ( const RelAlgExecutionUnit ra_exe_unit,
const size_t  level_idx,
const CompilationOptions co 
)
private

Definition at line 601 of file IRCodegen.cpp.

References AUTOMATIC_IR_METADATA, CodeGenerator::cgen_state_, CHECK, CHECK_LT, CodeGenerator::codegen(), CgenState::context_, CgenState::current_func_, CompilationOptions::filter_on_deleted_column, PlanState::getDeletedColForTable(), RelAlgExecutionUnit::input_descs, CgenState::ir_builder_, CgenState::llBool(), CgenState::llInt(), CodeGenerator::plan_state_, TABLE, and CodeGenerator::toBool().

603  {
605  if (!co.filter_on_deleted_column) {
606  return nullptr;
607  }
608  CHECK_LT(level_idx + 1, ra_exe_unit.input_descs.size());
609  const auto input_desc = ra_exe_unit.input_descs[level_idx + 1];
610  if (input_desc.getSourceType() != InputSourceType::TABLE) {
611  return nullptr;
612  }
613 
614  const auto deleted_cd = plan_state_->getDeletedColForTable(input_desc.getTableId());
615  if (!deleted_cd) {
616  return nullptr;
617  }
618  CHECK(deleted_cd->columnType.is_boolean());
619  const auto deleted_expr = makeExpr<Analyzer::ColumnVar>(deleted_cd->columnType,
620  input_desc.getTableId(),
621  deleted_cd->columnId,
622  input_desc.getNestLevel());
623  return [this, deleted_expr, level_idx, &co](const std::vector<llvm::Value*>& prev_iters,
624  llvm::Value* have_more_inner_rows) {
625  const auto matching_row_index = addJoinLoopIterator(prev_iters, level_idx + 1);
626  // Avoid fetching the deleted column from a position which is not valid.
627  // An invalid position can be returned by a one to one hash lookup (negative)
628  // or at the end of iteration over a set of matching values.
629  llvm::Value* is_valid_it{nullptr};
630  if (have_more_inner_rows) {
631  is_valid_it = have_more_inner_rows;
632  } else {
633  is_valid_it = cgen_state_->ir_builder_.CreateICmp(
634  llvm::ICmpInst::ICMP_SGE, matching_row_index, cgen_state_->llInt<int64_t>(0));
635  }
636  const auto it_valid_bb = llvm::BasicBlock::Create(
637  cgen_state_->context_, "it_valid", cgen_state_->current_func_);
638  const auto it_not_valid_bb = llvm::BasicBlock::Create(
639  cgen_state_->context_, "it_not_valid", cgen_state_->current_func_);
640  cgen_state_->ir_builder_.CreateCondBr(is_valid_it, it_valid_bb, it_not_valid_bb);
641  const auto row_is_deleted_bb = llvm::BasicBlock::Create(
642  cgen_state_->context_, "row_is_deleted", cgen_state_->current_func_);
643  cgen_state_->ir_builder_.SetInsertPoint(it_valid_bb);
644  CodeGenerator code_generator(this);
645  const auto row_is_deleted = code_generator.toBool(
646  code_generator.codegen(deleted_expr.get(), true, co).front());
647  cgen_state_->ir_builder_.CreateBr(row_is_deleted_bb);
648  cgen_state_->ir_builder_.SetInsertPoint(it_not_valid_bb);
649  const auto row_is_deleted_default = cgen_state_->llBool(false);
650  cgen_state_->ir_builder_.CreateBr(row_is_deleted_bb);
651  cgen_state_->ir_builder_.SetInsertPoint(row_is_deleted_bb);
652  auto row_is_deleted_or_default =
653  cgen_state_->ir_builder_.CreatePHI(row_is_deleted->getType(), 2);
654  row_is_deleted_or_default->addIncoming(row_is_deleted, it_valid_bb);
655  row_is_deleted_or_default->addIncoming(row_is_deleted_default, it_not_valid_bb);
656  return row_is_deleted_or_default;
657  };
658 }
std::vector< InputDescriptor > input_descs
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1025
#define AUTOMATIC_IR_METADATA(CGENSTATE)
#define CHECK_LT(x, y)
Definition: Logger.h:207
llvm::Value * addJoinLoopIterator(const std::vector< llvm::Value * > &prev_iters, const size_t level_idx)
Definition: IRCodegen.cpp:807
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

std::vector< JoinLoop > Executor::buildJoinLoops ( RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co,
const ExecutionOptions eo,
const std::vector< InputTableInfo > &  query_infos,
ColumnCacheMap column_cache 
)
private

Definition at line 279 of file IRCodegen.cpp.

References AUTOMATIC_IR_METADATA, CodeGenerator::cgen_state_, CHECK, CHECK_LT, CodeGenerator::codegen(), INJECT_TIMER, CgenState::ir_builder_, RelAlgExecutionUnit::join_quals, LEFT, CgenState::llBool(), OneToOne, CgenState::outer_join_match_found_per_level_, Set, Singleton, JoinLoopDomain::slot_lookup_result, CodeGenerator::toBool(), JoinCondition::type, and JoinLoopDomain::values_buffer.

284  {
287  std::vector<JoinLoop> join_loops;
288  for (size_t level_idx = 0, current_hash_table_idx = 0;
289  level_idx < ra_exe_unit.join_quals.size();
290  ++level_idx) {
291  const auto& current_level_join_conditions = ra_exe_unit.join_quals[level_idx];
292  std::vector<std::string> fail_reasons;
293  const auto build_cur_level_hash_table = [&]() {
294  if (current_level_join_conditions.quals.size() > 1) {
295  const auto first_qual = *current_level_join_conditions.quals.begin();
296  auto qual_bin_oper =
297  std::dynamic_pointer_cast<const Analyzer::BinOper>(first_qual);
298  if (qual_bin_oper && qual_bin_oper->is_overlaps_oper() &&
299  current_level_join_conditions.type == JoinType::LEFT) {
300  JoinCondition join_condition{{first_qual}, current_level_join_conditions.type};
301 
303  join_condition, ra_exe_unit, co, query_infos, column_cache, fail_reasons);
304  }
305  }
306  return buildCurrentLevelHashTable(current_level_join_conditions,
307  ra_exe_unit,
308  co,
309  query_infos,
310  column_cache,
311  fail_reasons);
312  };
313  const auto current_level_hash_table = build_cur_level_hash_table();
314  const auto found_outer_join_matches_cb =
315  [this, level_idx](llvm::Value* found_outer_join_matches) {
316  CHECK_LT(level_idx, cgen_state_->outer_join_match_found_per_level_.size());
317  CHECK(!cgen_state_->outer_join_match_found_per_level_[level_idx]);
318  cgen_state_->outer_join_match_found_per_level_[level_idx] =
319  found_outer_join_matches;
320  };
321  const auto is_deleted_cb = buildIsDeletedCb(ra_exe_unit, level_idx, co);
322  const auto outer_join_condition_multi_quals_cb =
323  [this, level_idx, &co, &current_level_join_conditions](
324  const std::vector<llvm::Value*>& prev_iters) {
325  // The values generated for the match path don't dominate all uses
326  // since on the non-match path nulls are generated. Reset the cache
327  // once the condition is generated to avoid incorrect reuse.
328  FetchCacheAnchor anchor(cgen_state_.get());
329  addJoinLoopIterator(prev_iters, level_idx + 1);
330  llvm::Value* left_join_cond = cgen_state_->llBool(true);
331  CodeGenerator code_generator(this);
332  // Do not want to look at all quals! only 1..N quals (ignore first qual)
333  // Note(jclay): this may need to support cases larger than 2
334  // are there any?
335  if (current_level_join_conditions.quals.size() >= 2) {
336  auto qual_it = std::next(current_level_join_conditions.quals.begin(), 1);
337  for (; qual_it != current_level_join_conditions.quals.end();
338  std::advance(qual_it, 1)) {
339  left_join_cond = cgen_state_->ir_builder_.CreateAnd(
340  left_join_cond,
341  code_generator.toBool(
342  code_generator.codegen(qual_it->get(), true, co).front()));
343  }
344  }
345  return left_join_cond;
346  };
347  if (current_level_hash_table) {
348  const auto hoisted_filters_cb = buildHoistLeftHandSideFiltersCb(
349  ra_exe_unit, level_idx, current_level_hash_table->getInnerTableId(), co);
350  if (current_level_hash_table->getHashType() == HashType::OneToOne) {
351  join_loops.emplace_back(
352  /*kind=*/JoinLoopKind::Singleton,
353  /*type=*/current_level_join_conditions.type,
354  /*iteration_domain_codegen=*/
355  [this, current_hash_table_idx, level_idx, current_level_hash_table, &co](
356  const std::vector<llvm::Value*>& prev_iters) {
357  addJoinLoopIterator(prev_iters, level_idx);
358  JoinLoopDomain domain{{0}};
359  domain.slot_lookup_result =
360  current_level_hash_table->codegenSlot(co, current_hash_table_idx);
361  return domain;
362  },
363  /*outer_condition_match=*/nullptr,
364  /*found_outer_matches=*/current_level_join_conditions.type == JoinType::LEFT
365  ? std::function<void(llvm::Value*)>(found_outer_join_matches_cb)
366  : nullptr,
367  /*hoisted_filters=*/hoisted_filters_cb,
368  /*is_deleted=*/is_deleted_cb);
369  } else {
370  join_loops.emplace_back(
371  /*kind=*/JoinLoopKind::Set,
372  /*type=*/current_level_join_conditions.type,
373  /*iteration_domain_codegen=*/
374  [this, current_hash_table_idx, level_idx, current_level_hash_table, &co](
375  const std::vector<llvm::Value*>& prev_iters) {
376  addJoinLoopIterator(prev_iters, level_idx);
377  JoinLoopDomain domain{{0}};
378  const auto matching_set = current_level_hash_table->codegenMatchingSet(
379  co, current_hash_table_idx);
380  domain.values_buffer = matching_set.elements;
381  domain.element_count = matching_set.count;
382  return domain;
383  },
384  /*outer_condition_match=*/
385  current_level_join_conditions.type == JoinType::LEFT
386  ? std::function<llvm::Value*(const std::vector<llvm::Value*>&)>(
387  outer_join_condition_multi_quals_cb)
388  : nullptr,
389  /*found_outer_matches=*/current_level_join_conditions.type == JoinType::LEFT
390  ? std::function<void(llvm::Value*)>(found_outer_join_matches_cb)
391  : nullptr,
392  /*hoisted_filters=*/hoisted_filters_cb,
393  /*is_deleted=*/is_deleted_cb);
394  }
395  ++current_hash_table_idx;
396  } else {
397  const auto fail_reasons_str = current_level_join_conditions.quals.empty()
398  ? "No equijoin expression found"
399  : boost::algorithm::join(fail_reasons, " | ");
401  ra_exe_unit, eo, query_infos, level_idx, fail_reasons_str);
402  // Callback provided to the `JoinLoop` framework to evaluate the (outer) join
403  // condition.
404  VLOG(1) << "Unable to build hash table, falling back to loop join: "
405  << fail_reasons_str;
406  const auto outer_join_condition_cb =
407  [this, level_idx, &co, &current_level_join_conditions](
408  const std::vector<llvm::Value*>& prev_iters) {
409  // The values generated for the match path don't dominate all uses
410  // since on the non-match path nulls are generated. Reset the cache
411  // once the condition is generated to avoid incorrect reuse.
412  FetchCacheAnchor anchor(cgen_state_.get());
413  addJoinLoopIterator(prev_iters, level_idx + 1);
414  llvm::Value* left_join_cond = cgen_state_->llBool(true);
415  CodeGenerator code_generator(this);
416  for (auto expr : current_level_join_conditions.quals) {
417  left_join_cond = cgen_state_->ir_builder_.CreateAnd(
418  left_join_cond,
419  code_generator.toBool(
420  code_generator.codegen(expr.get(), true, co).front()));
421  }
422  return left_join_cond;
423  };
424  join_loops.emplace_back(
425  /*kind=*/JoinLoopKind::UpperBound,
426  /*type=*/current_level_join_conditions.type,
427  /*iteration_domain_codegen=*/
428  [this, level_idx](const std::vector<llvm::Value*>& prev_iters) {
429  addJoinLoopIterator(prev_iters, level_idx);
430  JoinLoopDomain domain{{0}};
431  const auto rows_per_scan_ptr = cgen_state_->ir_builder_.CreateGEP(
432  get_arg_by_name(cgen_state_->row_func_, "num_rows_per_scan"),
433  cgen_state_->llInt(int32_t(level_idx + 1)));
434  domain.upper_bound = cgen_state_->ir_builder_.CreateLoad(rows_per_scan_ptr,
435  "num_rows_per_scan");
436  return domain;
437  },
438  /*outer_condition_match=*/
439  current_level_join_conditions.type == JoinType::LEFT
440  ? std::function<llvm::Value*(const std::vector<llvm::Value*>&)>(
441  outer_join_condition_cb)
442  : nullptr,
443  /*found_outer_matches=*/
444  current_level_join_conditions.type == JoinType::LEFT
445  ? std::function<void(llvm::Value*)>(found_outer_join_matches_cb)
446  : nullptr,
447  /*hoisted_filters=*/nullptr,
448  /*is_deleted=*/is_deleted_cb);
449  }
450  }
451  return join_loops;
452 }
std::shared_ptr< HashJoin > buildCurrentLevelHashTable(const JoinCondition &current_level_join_conditions, RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const std::vector< InputTableInfo > &query_infos, ColumnCacheMap &column_cache, std::vector< std::string > &fail_reasons)
Definition: IRCodegen.cpp:660
llvm::Value * values_buffer
Definition: JoinLoop.h:48
std::string join(T const &container, std::string const &delim)
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:168
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const JoinQualsPerNestingLevel join_quals
#define AUTOMATIC_IR_METADATA(CGENSTATE)
llvm::Value * slot_lookup_result
Definition: JoinLoop.h:46
#define CHECK_LT(x, y)
Definition: Logger.h:207
llvm::Value * addJoinLoopIterator(const std::vector< llvm::Value * > &prev_iters, const size_t level_idx)
Definition: IRCodegen.cpp:807
#define CHECK(condition)
Definition: Logger.h:197
void check_if_loop_join_is_allowed(RelAlgExecutionUnit &ra_exe_unit, const ExecutionOptions &eo, const std::vector< InputTableInfo > &query_infos, const size_t level_idx, const std::string &fail_reason)
Definition: IRCodegen.cpp:240
JoinLoop::HoistedFiltersCallback buildHoistLeftHandSideFiltersCb(const RelAlgExecutionUnit &ra_exe_unit, const size_t level_idx, const int inner_table_id, const CompilationOptions &co)
Definition: IRCodegen.cpp:492
std::vector< JoinLoop > buildJoinLoops(RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, const std::vector< InputTableInfo > &query_infos, ColumnCacheMap &column_cache)
Definition: IRCodegen.cpp:279
std::function< llvm::Value *(const std::vector< llvm::Value * > &, llvm::Value *)> buildIsDeletedCb(const RelAlgExecutionUnit &ra_exe_unit, const size_t level_idx, const CompilationOptions &co)
Definition: IRCodegen.cpp:601
#define VLOG(n)
Definition: Logger.h:291

+ Here is the call graph for this function:

void Executor::buildSelectedFragsMapping ( std::vector< std::vector< size_t >> &  selected_fragments_crossjoin,
std::vector< size_t > &  local_col_to_frag_pos,
const std::list< std::shared_ptr< const InputColDescriptor >> &  col_global_ids,
const FragmentsList selected_fragments,
const RelAlgExecutionUnit ra_exe_unit 
)
private

Definition at line 2756 of file Execute.cpp.

References CHECK, CHECK_EQ, CHECK_LT, and RelAlgExecutionUnit::input_descs.

2761  {
2762  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2763  size_t frag_pos{0};
2764  const auto& input_descs = ra_exe_unit.input_descs;
2765  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2766  const int table_id = input_descs[scan_idx].getTableId();
2767  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2768  selected_fragments_crossjoin.push_back(
2769  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2770  for (const auto& col_id : col_global_ids) {
2771  CHECK(col_id);
2772  const auto& input_desc = col_id->getScanDesc();
2773  if (input_desc.getTableId() != table_id ||
2774  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2775  continue;
2776  }
2777  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2778  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2779  CHECK_LT(static_cast<size_t>(it->second),
2780  plan_state_->global_to_local_col_ids_.size());
2781  local_col_to_frag_pos[it->second] = frag_pos;
2782  }
2783  ++frag_pos;
2784  }
2785 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< InputDescriptor > input_descs
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1025
#define CHECK_LT(x, y)
Definition: Logger.h:207
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2742
#define CHECK(condition)
Definition: Logger.h:197
void Executor::buildSelectedFragsMappingForUnion ( std::vector< std::vector< size_t >> &  selected_fragments_crossjoin,
std::vector< size_t > &  local_col_to_frag_pos,
const std::list< std::shared_ptr< const InputColDescriptor >> &  col_global_ids,
const FragmentsList selected_fragments,
const RelAlgExecutionUnit ra_exe_unit 
)
private

Definition at line 2787 of file Execute.cpp.

References CHECK, CHECK_LT, and RelAlgExecutionUnit::input_descs.

2792  {
2793  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2794  size_t frag_pos{0};
2795  const auto& input_descs = ra_exe_unit.input_descs;
2796  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2797  const int table_id = input_descs[scan_idx].getTableId();
2798  // selected_fragments here is from assignFragsToKernelDispatch
2799  // execution_kernel.fragments
2800  if (selected_fragments[0].table_id != table_id) { // TODO 0
2801  continue;
2802  }
2803  // CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2804  selected_fragments_crossjoin.push_back(
2805  // getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2806  {size_t(1)}); // TODO
2807  for (const auto& col_id : col_global_ids) {
2808  CHECK(col_id);
2809  const auto& input_desc = col_id->getScanDesc();
2810  if (input_desc.getTableId() != table_id ||
2811  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2812  continue;
2813  }
2814  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2815  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2816  CHECK_LT(static_cast<size_t>(it->second),
2817  plan_state_->global_to_local_col_ids_.size());
2818  local_col_to_frag_pos[it->second] = frag_pos;
2819  }
2820  ++frag_pos;
2821  }
2822 }
std::vector< InputDescriptor > input_descs
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1025
#define CHECK_LT(x, y)
Definition: Logger.h:207
#define CHECK(condition)
Definition: Logger.h:197
llvm::Value * Executor::castToFP ( llvm::Value *  value,
SQLTypeInfo const from_ti,
SQLTypeInfo const to_ti 
)
private

Definition at line 3361 of file Execute.cpp.

References AUTOMATIC_IR_METADATA, exp_to_scale(), logger::FATAL, SQLTypeInfo::get_scale(), SQLTypeInfo::get_size(), SQLTypeInfo::is_fp(), SQLTypeInfo::is_number(), and LOG.

3363  {
3365  if (value->getType()->isIntegerTy() && from_ti.is_number() && to_ti.is_fp() &&
3366  (!from_ti.is_fp() || from_ti.get_size() != to_ti.get_size())) {
3367  llvm::Type* fp_type{nullptr};
3368  switch (to_ti.get_size()) {
3369  case 4:
3370  fp_type = llvm::Type::getFloatTy(cgen_state_->context_);
3371  break;
3372  case 8:
3373  fp_type = llvm::Type::getDoubleTy(cgen_state_->context_);
3374  break;
3375  default:
3376  LOG(FATAL) << "Unsupported FP size: " << to_ti.get_size();
3377  }
3378  value = cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
3379  if (from_ti.get_scale()) {
3380  value = cgen_state_->ir_builder_.CreateFDiv(
3381  value,
3382  llvm::ConstantFP::get(value->getType(), exp_to_scale(from_ti.get_scale())));
3383  }
3384  }
3385  return value;
3386 }
#define LOG(tag)
Definition: Logger.h:188
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
#define AUTOMATIC_IR_METADATA(CGENSTATE)
uint64_t exp_to_scale(const unsigned exp)

+ Here is the call graph for this function:

llvm::Value * Executor::castToIntPtrTyIn ( llvm::Value *  val,
const size_t  bit_width 
)
private

Definition at line 3388 of file Execute.cpp.

References AUTOMATIC_IR_METADATA, CHECK, CHECK_LT, and get_int_type().

3388  {
3390  CHECK(val->getType()->isPointerTy());
3391 
3392  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3393  const auto val_type = val_ptr_type->getElementType();
3394  size_t val_width = 0;
3395  if (val_type->isIntegerTy()) {
3396  val_width = val_type->getIntegerBitWidth();
3397  } else {
3398  if (val_type->isFloatTy()) {
3399  val_width = 32;
3400  } else {
3401  CHECK(val_type->isDoubleTy());
3402  val_width = 64;
3403  }
3404  }
3405  CHECK_LT(size_t(0), val_width);
3406  if (bitWidth == val_width) {
3407  return val;
3408  }
3409  return cgen_state_->ir_builder_.CreateBitCast(
3410  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3411 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
#define CHECK_LT(x, y)
Definition: Logger.h:207
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

bool Executor::checkCurrentQuerySession ( const std::string &  candidate_query_session,
mapd_shared_lock< mapd_shared_mutex > &  read_lock 
)

Definition at line 3838 of file Execute.cpp.

3839  {
3840  // if current_query_session is equal to the candidate_query_session,
3841  // or it is empty session we consider
3842  return !candidate_query_session.empty() &&
3843  (current_query_session_ == candidate_query_session);
3844 }
static QuerySessionId current_query_session_
Definition: Execute.h:1071
bool Executor::checkIsQuerySessionEnrolled ( const QuerySessionId query_session,
mapd_shared_lock< mapd_shared_mutex > &  read_lock 
)

Definition at line 4141 of file Execute.cpp.

4143  {
4144  if (query_session.empty()) {
4145  return false;
4146  }
4147  return !query_session.empty() && queries_session_map_.count(query_session);
4148 }
static QuerySessionMap queries_session_map_
Definition: Execute.h:1077
bool Executor::checkIsQuerySessionInterrupted ( const std::string &  query_session,
mapd_shared_lock< mapd_shared_mutex > &  read_lock 
)

Definition at line 4125 of file Execute.cpp.

4127  {
4128  if (query_session.empty()) {
4129  return false;
4130  }
4131  auto flag_it = queries_interrupt_flag_.find(query_session);
4132  return !query_session.empty() && flag_it != queries_interrupt_flag_.end() &&
4133  flag_it->second;
4134 }
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:1075
bool Executor::checkIsRunningQuerySessionInterrupted ( )

Definition at line 4136 of file Execute.cpp.

4136  {
4137  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
4138  return checkIsQuerySessionInterrupted(current_query_session_, session_read_lock);
4139 }
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1069
bool checkIsQuerySessionInterrupted(const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4125
static QuerySessionId current_query_session_
Definition: Execute.h:1071
void Executor::checkPendingQueryStatus ( const QuerySessionId query_session)

Definition at line 3869 of file Execute.cpp.

References ERR_INTERRUPTED, and VLOG.

3869  {
3870  // check whether we are okay to execute the "pending" query
3871  // i.e., before running the query check if this query session is "ALREADY" interrupted
3872  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3873  if (query_session.empty()) {
3874  return;
3875  }
3876  if (queries_interrupt_flag_.find(query_session) == queries_interrupt_flag_.end()) {
3877  // something goes wrong since we assume this is caller's responsibility
3878  // (call this function only for enrolled query session)
3879  if (!queries_session_map_.count(query_session)) {
3880  VLOG(1) << "Interrupting pending query is not available since the query session is "
3881  "not enrolled";
3882  } else {
3883  // here the query session is enrolled but the interrupt flag is not registered
3884  VLOG(1)
3885  << "Interrupting pending query is not available since its interrupt flag is "
3886  "not registered";
3887  }
3888  return;
3889  }
3890  if (queries_interrupt_flag_[query_session]) {
3892  }
3893 }
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1069
static QuerySessionMap queries_session_map_
Definition: Execute.h:1077
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1117
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:1075
#define VLOG(n)
Definition: Logger.h:291
void Executor::clearMemory ( const Data_Namespace::MemoryLevel  memory_level)
static

Definition at line 183 of file Execute.cpp.

References Data_Namespace::DataMgr::clearMemory(), Data_Namespace::CPU_LEVEL, Catalog_Namespace::SysCatalog::getDataMgr(), Data_Namespace::GPU_LEVEL, Catalog_Namespace::SysCatalog::instance(), and CacheInvalidator< CACHE_HOLDING_TYPES >::invalidateCaches().

Referenced by DBHandler::clear_cpu_memory(), DBHandler::clear_gpu_memory(), QueryRunner::QueryRunner::clearCpuMemory(), and QueryRunner::QueryRunner::clearGpuMemory().

183  {
184  switch (memory_level) {
187  mapd_unique_lock<mapd_shared_mutex> flush_lock(
188  execute_mutex_); // Don't flush memory while queries are running
189 
191  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
192  // The hash table cache uses CPU memory not managed by the buffer manager. In the
193  // future, we should manage these allocations with the buffer manager directly.
194  // For now, assume the user wants to purge the hash table cache when they clear
195  // CPU memory (currently used in ExecuteTest to lower memory pressure)
197  }
198  break;
199  }
200  default: {
201  throw std::runtime_error(
202  "Clearing memory levels other than the CPU level or GPU level is not "
203  "supported.");
204  }
205  }
206 }
static mapd_shared_mutex execute_mutex_
Definition: Execute.h:1084
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:384
static void invalidateCaches()
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:193
static SysCatalog & instance()
Definition: SysCatalog.h:292

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Executor::clearMetaInfoCache ( )
private

Definition at line 375 of file Execute.cpp.

References input_table_info_cache_().

375  {
379 }
AggregatedColRange agg_col_range_cache_
Definition: Execute.h:1067
InputTableInfoCache input_table_info_cache_
Definition: Execute.h:1066
TableGenerations table_generations_
Definition: Execute.h:1068

+ Here is the call graph for this function:

void Executor::clearQuerySessionStatus ( const QuerySessionId query_session,
const std::string &  submitted_time_str,
const bool  acquire_spin_lock 
)

Definition at line 3895 of file Execute.cpp.

References executor_id_().

3897  {
3898  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3899  // clear the interrupt-related info for a finished query
3900  if (query_session.empty()) {
3901  return;
3902  }
3903  removeFromQuerySessionList(query_session, submitted_time_str, session_write_lock);
3904  if (query_session.compare(current_query_session_) == 0 &&
3906  invalidateRunningQuerySession(session_write_lock);
3907  if (acquire_spin_lock) {
3908  // try to unlock executor's internal spin lock (let say "L") iff it is acquired
3909  // otherwise we do not need to care about the "L" lock
3910  // i.e., import table does not have a code path towards Executor
3911  // so we just exploit executor's session management code and also global interrupt
3912  // flag excepting this "L" lock
3913  execute_spin_lock_.clear(std::memory_order_release);
3914  }
3915  resetInterrupt();
3916  }
3917 }
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1069
void invalidateRunningQuerySession(mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3846
static std::atomic_flag execute_spin_lock_
Definition: Execute.h:1080
const ExecutorId executor_id_
Definition: Execute.h:1053
static QuerySessionId current_query_session_
Definition: Execute.h:1071
void resetInterrupt()
bool removeFromQuerySessionList(const QuerySessionId &query_session, const std::string &submitted_time_str, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4070
static size_t running_query_executor_id_
Definition: Execute.h:1073

+ Here is the call graph for this function:

llvm::Value * Executor::codegenAggregateWindowState ( )
private

Definition at line 326 of file WindowFunctionIR.cpp.

References AUTOMATIC_IR_METADATA, AVG, COUNT, anonymous_namespace{WindowFunctionIR.cpp}::get_adjusted_window_type_info(), get_int_type(), WindowProjectNodeContext::getActiveWindowFunctionContext(), Analyzer::WindowFunction::getKind(), kDECIMAL, kDOUBLE, and kFLOAT.

326  {
328  const auto pi32_type =
329  llvm::PointerType::get(get_int_type(32, cgen_state_->context_), 0);
330  const auto pi64_type =
331  llvm::PointerType::get(get_int_type(64, cgen_state_->context_), 0);
332  const auto window_func_context =
334  const Analyzer::WindowFunction* window_func = window_func_context->getWindowFunction();
335  const auto window_func_ti = get_adjusted_window_type_info(window_func);
336  const auto aggregate_state_type =
337  window_func_ti.get_type() == kFLOAT ? pi32_type : pi64_type;
338  auto aggregate_state = aggregateWindowStatePtr();
339  if (window_func->getKind() == SqlWindowFunctionKind::AVG) {
340  const auto aggregate_state_count_i64 = cgen_state_->llInt(
341  reinterpret_cast<const int64_t>(window_func_context->aggregateStateCount()));
342  auto aggregate_state_count = cgen_state_->ir_builder_.CreateIntToPtr(
343  aggregate_state_count_i64, aggregate_state_type);
344  const auto double_null_lv = cgen_state_->inlineFpNull(SQLTypeInfo(kDOUBLE));
345  switch (window_func_ti.get_type()) {
346  case kFLOAT: {
347  return cgen_state_->emitCall(
348  "load_avg_float", {aggregate_state, aggregate_state_count, double_null_lv});
349  }
350  case kDOUBLE: {
351  return cgen_state_->emitCall(
352  "load_avg_double", {aggregate_state, aggregate_state_count, double_null_lv});
353  }
354  case kDECIMAL: {
355  return cgen_state_->emitCall(
356  "load_avg_decimal",
357  {aggregate_state,
358  aggregate_state_count,
359  double_null_lv,
360  cgen_state_->llInt<int32_t>(window_func_ti.get_scale())});
361  }
362  default: {
363  return cgen_state_->emitCall(
364  "load_avg_int", {aggregate_state, aggregate_state_count, double_null_lv});
365  }
366  }
367  }
368  if (window_func->getKind() == SqlWindowFunctionKind::COUNT) {
369  return cgen_state_->ir_builder_.CreateLoad(aggregate_state);
370  }
371  switch (window_func_ti.get_type()) {
372  case kFLOAT: {
373  return cgen_state_->emitCall("load_float", {aggregate_state});
374  }
375  case kDOUBLE: {
376  return cgen_state_->emitCall("load_double", {aggregate_state});
377  }
378  default: {
379  return cgen_state_->ir_builder_.CreateLoad(aggregate_state);
380  }
381  }
382 }
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:1447
llvm::Value * aggregateWindowStatePtr()
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
SQLTypeInfo get_adjusted_window_type_info(const Analyzer::WindowFunction *window_func)

+ Here is the call graph for this function:

void Executor::codegenJoinLoops ( const std::vector< JoinLoop > &  join_loops,
const RelAlgExecutionUnit ra_exe_unit,
GroupByAndAggregate group_by_and_aggregate,
llvm::Function *  query_func,
llvm::BasicBlock *  entry_bb,
const QueryMemoryDescriptor query_mem_desc,
const CompilationOptions co,
const ExecutionOptions eo 
)
private

Definition at line 825 of file IRCodegen.cpp.

References ExecutionOptions::allow_runtime_query_interrupt, AUTOMATIC_IR_METADATA, CodeGenerator::cgen_state_, JoinLoop::codegen(), CgenState::context_, CgenState::current_func_, CompilationOptions::device_type, CgenState::ir_builder_, CgenState::llInt(), CgenState::needs_error_check_, CodeGenerator::posArg(), GroupByAndAggregate::query_infos_, and ExecutionOptions::with_dynamic_watchdog.

832  {
834  const auto exit_bb =
835  llvm::BasicBlock::Create(cgen_state_->context_, "exit", cgen_state_->current_func_);
836  cgen_state_->ir_builder_.SetInsertPoint(exit_bb);
837  cgen_state_->ir_builder_.CreateRet(cgen_state_->llInt<int32_t>(0));
838  cgen_state_->ir_builder_.SetInsertPoint(entry_bb);
839  CodeGenerator code_generator(this);
840  const auto loops_entry_bb = JoinLoop::codegen(
841  join_loops,
842  /*body_codegen=*/
843  [this,
844  query_func,
845  &query_mem_desc,
846  &co,
847  &eo,
848  &group_by_and_aggregate,
849  &join_loops,
850  &ra_exe_unit](const std::vector<llvm::Value*>& prev_iters) {
852  addJoinLoopIterator(prev_iters, join_loops.size());
853  auto& builder = cgen_state_->ir_builder_;
854  const auto loop_body_bb = llvm::BasicBlock::Create(
855  builder.getContext(), "loop_body", builder.GetInsertBlock()->getParent());
856  builder.SetInsertPoint(loop_body_bb);
857  const bool can_return_error =
858  compileBody(ra_exe_unit, group_by_and_aggregate, query_mem_desc, co);
859  if (can_return_error || cgen_state_->needs_error_check_ ||
861  createErrorCheckControlFlow(query_func,
864  co.device_type,
865  group_by_and_aggregate.query_infos_);
866  }
867  return loop_body_bb;
868  },
869  /*outer_iter=*/code_generator.posArg(nullptr),
870  exit_bb,
871  cgen_state_.get());
872  cgen_state_->ir_builder_.SetInsertPoint(entry_bb);
873  cgen_state_->ir_builder_.CreateBr(loops_entry_bb);
874 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
bool compileBody(const RelAlgExecutionUnit &ra_exe_unit, GroupByAndAggregate &group_by_and_aggregate, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const GpuSharedMemoryContext &gpu_smem_context={})
static llvm::BasicBlock * codegen(const std::vector< JoinLoop > &join_loops, const std::function< llvm::BasicBlock *(const std::vector< llvm::Value * > &)> &body_codegen, llvm::Value *outer_iter, llvm::BasicBlock *exit_bb, CgenState *cgen_state)
Definition: JoinLoop.cpp:48
#define AUTOMATIC_IR_METADATA(CGENSTATE)
ExecutorDeviceType device_type
const std::vector< InputTableInfo > & query_infos_
llvm::Value * addJoinLoopIterator(const std::vector< llvm::Value * > &prev_iters, const size_t level_idx)
Definition: IRCodegen.cpp:807
void createErrorCheckControlFlow(llvm::Function *query_func, bool run_with_dynamic_watchdog, bool run_with_allowing_runtime_interrupt, ExecutorDeviceType device_type, const std::vector< InputTableInfo > &input_table_infos)

+ Here is the call graph for this function:

llvm::BasicBlock * Executor::codegenSkipDeletedOuterTableRow ( const RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co 
)
private

Definition at line 2951 of file NativeCodegen.cpp.

2953  {
2955  if (!co.filter_on_deleted_column) {
2956  return nullptr;
2957  }
2958  CHECK(!ra_exe_unit.input_descs.empty());
2959  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
2960  if (outer_input_desc.getSourceType() != InputSourceType::TABLE) {
2961  return nullptr;
2962  }
2963  const auto deleted_cd =
2964  plan_state_->getDeletedColForTable(outer_input_desc.getTableId());
2965  if (!deleted_cd) {
2966  return nullptr;
2967  }
2968  CHECK(deleted_cd->columnType.is_boolean());
2969  const auto deleted_expr =
2970  makeExpr<Analyzer::ColumnVar>(deleted_cd->columnType,
2971  outer_input_desc.getTableId(),
2972  deleted_cd->columnId,
2973  outer_input_desc.getNestLevel());
2974  CodeGenerator code_generator(this);
2975  const auto is_deleted =
2976  code_generator.toBool(code_generator.codegen(deleted_expr.get(), true, co).front());
2977  const auto is_deleted_bb = llvm::BasicBlock::Create(
2978  cgen_state_->context_, "is_deleted", cgen_state_->row_func_);
2979  llvm::BasicBlock* bb = llvm::BasicBlock::Create(
2980  cgen_state_->context_, "is_not_deleted", cgen_state_->row_func_);
2981  cgen_state_->ir_builder_.CreateCondBr(is_deleted, is_deleted_bb, bb);
2982  cgen_state_->ir_builder_.SetInsertPoint(is_deleted_bb);
2983  cgen_state_->ir_builder_.CreateRet(cgen_state_->llInt<int32_t>(0));
2984  cgen_state_->ir_builder_.SetInsertPoint(bb);
2985  return bb;
2986 }
std::vector< InputDescriptor > input_descs
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1025
#define AUTOMATIC_IR_METADATA(CGENSTATE)
#define CHECK(condition)
Definition: Logger.h:197
void Executor::codegenWindowAvgEpilogue ( llvm::Value *  crt_val,
llvm::Value *  window_func_null_val,
llvm::Value *  multiplicity_lv 
)
private

Definition at line 289 of file WindowFunctionIR.cpp.

References AUTOMATIC_IR_METADATA, anonymous_namespace{WindowFunctionIR.cpp}::get_adjusted_window_type_info(), get_int_type(), WindowProjectNodeContext::getActiveWindowFunctionContext(), kDOUBLE, and kFLOAT.

291  {
293  const auto window_func_context =
295  const auto window_func = window_func_context->getWindowFunction();
296  const auto window_func_ti = get_adjusted_window_type_info(window_func);
297  const auto pi32_type =
298  llvm::PointerType::get(get_int_type(32, cgen_state_->context_), 0);
299  const auto pi64_type =
300  llvm::PointerType::get(get_int_type(64, cgen_state_->context_), 0);
301  const auto aggregate_state_type =
302  window_func_ti.get_type() == kFLOAT ? pi32_type : pi64_type;
303  const auto aggregate_state_count_i64 = cgen_state_->llInt(
304  reinterpret_cast<const int64_t>(window_func_context->aggregateStateCount()));
305  auto aggregate_state_count = cgen_state_->ir_builder_.CreateIntToPtr(
306  aggregate_state_count_i64, aggregate_state_type);
307  std::string agg_count_func_name = "agg_count";
308  switch (window_func_ti.get_type()) {
309  case kFLOAT: {
310  agg_count_func_name += "_float";
311  break;
312  }
313  case kDOUBLE: {
314  agg_count_func_name += "_double";
315  break;
316  }
317  default: {
318  break;
319  }
320  }
321  agg_count_func_name += "_skip_val";
322  cgen_state_->emitCall(agg_count_func_name,
323  {aggregate_state_count, crt_val, window_func_null_val});
324 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
SQLTypeInfo get_adjusted_window_type_info(const Analyzer::WindowFunction *window_func)

+ Here is the call graph for this function:

llvm::Value * Executor::codegenWindowFunction ( const size_t  target_index,
const CompilationOptions co 
)
private

Definition at line 21 of file WindowFunctionIR.cpp.

References WindowProjectNodeContext::activateWindowFunctionContext(), run_benchmark_import::args, AUTOMATIC_IR_METADATA, AVG, CHECK, CHECK_EQ, COUNT, CUME_DIST, DENSE_RANK, logger::FATAL, FIRST_VALUE, WindowProjectNodeContext::get(), WindowFunctionContext::getWindowFunction(), LAG, LAST_VALUE, LEAD, LOG, MAX, MIN, NTILE, PERCENT_RANK, RANK, ROW_NUMBER, and SUM.

22  {
24  CodeGenerator code_generator(this);
25  const auto window_func_context =
27  target_index);
28  const auto window_func = window_func_context->getWindowFunction();
29  switch (window_func->getKind()) {
34  return cgen_state_->emitCall("row_number_window_func",
35  {cgen_state_->llInt(reinterpret_cast<const int64_t>(
36  window_func_context->output())),
37  code_generator.posArg(nullptr)});
38  }
41  return cgen_state_->emitCall("percent_window_func",
42  {cgen_state_->llInt(reinterpret_cast<const int64_t>(
43  window_func_context->output())),
44  code_generator.posArg(nullptr)});
45  }
51  const auto& args = window_func->getArgs();
52  CHECK(!args.empty());
53  const auto arg_lvs = code_generator.codegen(args.front().get(), true, co);
54  CHECK_EQ(arg_lvs.size(), size_t(1));
55  return arg_lvs.front();
56  }
63  }
64  default: {
65  LOG(FATAL) << "Invalid window function kind";
66  }
67  }
68  return nullptr;
69 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
#define LOG(tag)
Definition: Logger.h:188
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
static const WindowProjectNodeContext * get(Executor *executor)
const WindowFunctionContext * activateWindowFunctionContext(Executor *executor, const size_t target_index) const
#define AUTOMATIC_IR_METADATA(CGENSTATE)
llvm::Value * codegenWindowFunctionAggregate(const CompilationOptions &co)
#define CHECK(condition)
Definition: Logger.h:197
const Analyzer::WindowFunction * getWindowFunction() const

+ Here is the call graph for this function:

llvm::Value * Executor::codegenWindowFunctionAggregate ( const CompilationOptions co)
private

Definition at line 140 of file WindowFunctionIR.cpp.

References AUTOMATIC_IR_METADATA, AVG, CHECK, WindowProjectNodeContext::get(), get_int_type(), and WindowProjectNodeContext::getActiveWindowFunctionContext().

140  {
142  const auto reset_state_false_bb = codegenWindowResetStateControlFlow();
143  auto aggregate_state = aggregateWindowStatePtr();
144  llvm::Value* aggregate_state_count = nullptr;
145  const auto window_func_context =
147  const auto window_func = window_func_context->getWindowFunction();
148  if (window_func->getKind() == SqlWindowFunctionKind::AVG) {
149  const auto aggregate_state_count_i64 = cgen_state_->llInt(
150  reinterpret_cast<const int64_t>(window_func_context->aggregateStateCount()));
151  const auto pi64_type =
152  llvm::PointerType::get(get_int_type(64, cgen_state_->context_), 0);
153  aggregate_state_count =
154  cgen_state_->ir_builder_.CreateIntToPtr(aggregate_state_count_i64, pi64_type);
155  }
156  codegenWindowFunctionStateInit(aggregate_state);
157  if (window_func->getKind() == SqlWindowFunctionKind::AVG) {
158  const auto count_zero = cgen_state_->llInt(int64_t(0));
159  cgen_state_->emitCall("agg_id", {aggregate_state_count, count_zero});
160  }
161  cgen_state_->ir_builder_.CreateBr(reset_state_false_bb);
162  cgen_state_->ir_builder_.SetInsertPoint(reset_state_false_bb);
164  return codegenWindowFunctionAggregateCalls(aggregate_state, co);
165 }
llvm::Value * aggregateWindowStatePtr()
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
static const WindowProjectNodeContext * get(Executor *executor)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
void codegenWindowFunctionStateInit(llvm::Value *aggregate_state)
#define CHECK(condition)
Definition: Logger.h:197
llvm::Value * codegenWindowFunctionAggregateCalls(llvm::Value *aggregate_state, const CompilationOptions &co)
llvm::BasicBlock * codegenWindowResetStateControlFlow()

+ Here is the call graph for this function:

llvm::Value * Executor::codegenWindowFunctionAggregateCalls ( llvm::Value *  aggregate_state,
const CompilationOptions co 
)
private

Definition at line 246 of file WindowFunctionIR.cpp.

References run_benchmark_import::args, AUTOMATIC_IR_METADATA, AVG, CHECK, CHECK_EQ, CodeGenerator::codegen(), CodeGenerator::codegenCastBetweenIntTypes(), COUNT, anonymous_namespace{WindowFunctionIR.cpp}::get_adjusted_window_type_info(), anonymous_namespace{WindowFunctionIR.cpp}::get_window_agg_name(), WindowProjectNodeContext::getActiveWindowFunctionContext(), kFLOAT, and SUM.

247  {
249  const auto window_func_context =
251  const auto window_func = window_func_context->getWindowFunction();
252  const auto window_func_ti = get_adjusted_window_type_info(window_func);
253  const auto window_func_null_val =
254  window_func_ti.is_fp()
255  ? cgen_state_->inlineFpNull(window_func_ti)
256  : cgen_state_->castToTypeIn(cgen_state_->inlineIntNull(window_func_ti), 64);
257  const auto& args = window_func->getArgs();
258  llvm::Value* crt_val;
259  if (args.empty()) {
260  CHECK(window_func->getKind() == SqlWindowFunctionKind::COUNT);
261  crt_val = cgen_state_->llInt(int64_t(1));
262  } else {
263  CodeGenerator code_generator(this);
264  const auto arg_lvs = code_generator.codegen(args.front().get(), true, co);
265  CHECK_EQ(arg_lvs.size(), size_t(1));
266  if (window_func->getKind() == SqlWindowFunctionKind::SUM && !window_func_ti.is_fp()) {
267  crt_val = code_generator.codegenCastBetweenIntTypes(
268  arg_lvs.front(), args.front()->get_type_info(), window_func_ti, false);
269  } else {
270  crt_val = window_func_ti.get_type() == kFLOAT
271  ? arg_lvs.front()
272  : cgen_state_->castToTypeIn(arg_lvs.front(), 64);
273  }
274  }
275  const auto agg_name = get_window_agg_name(window_func->getKind(), window_func_ti);
276  llvm::Value* multiplicity_lv = nullptr;
277  if (args.empty()) {
278  cgen_state_->emitCall(agg_name, {aggregate_state, crt_val});
279  } else {
280  cgen_state_->emitCall(agg_name + "_skip_val",
281  {aggregate_state, crt_val, window_func_null_val});
282  }
283  if (window_func->getKind() == SqlWindowFunctionKind::AVG) {
284  codegenWindowAvgEpilogue(crt_val, window_func_null_val, multiplicity_lv);
285  }
287 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
std::string get_window_agg_name(const SqlWindowFunctionKind kind, const SQLTypeInfo &window_func_ti)
void codegenWindowAvgEpilogue(llvm::Value *crt_val, llvm::Value *window_func_null_val, llvm::Value *multiplicity_lv)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
llvm::Value * codegenAggregateWindowState()
#define CHECK(condition)
Definition: Logger.h:197
SQLTypeInfo get_adjusted_window_type_info(const Analyzer::WindowFunction *window_func)

+ Here is the call graph for this function:

void Executor::codegenWindowFunctionStateInit ( llvm::Value *  aggregate_state)
private

Definition at line 196 of file WindowFunctionIR.cpp.

References AUTOMATIC_IR_METADATA, COUNT, anonymous_namespace{WindowFunctionIR.cpp}::get_adjusted_window_type_info(), get_int_type(), WindowProjectNodeContext::getActiveWindowFunctionContext(), kDOUBLE, and kFLOAT.

196  {
198  const auto window_func_context =
200  const auto window_func = window_func_context->getWindowFunction();
201  const auto window_func_ti = get_adjusted_window_type_info(window_func);
202  const auto window_func_null_val =
203  window_func_ti.is_fp()
204  ? cgen_state_->inlineFpNull(window_func_ti)
205  : cgen_state_->castToTypeIn(cgen_state_->inlineIntNull(window_func_ti), 64);
206  llvm::Value* window_func_init_val;
207  if (window_func_context->getWindowFunction()->getKind() ==
209  switch (window_func_ti.get_type()) {
210  case kFLOAT: {
211  window_func_init_val = cgen_state_->llFp(float(0));
212  break;
213  }
214  case kDOUBLE: {
215  window_func_init_val = cgen_state_->llFp(double(0));
216  break;
217  }
218  default: {
219  window_func_init_val = cgen_state_->llInt(int64_t(0));
220  break;
221  }
222  }
223  } else {
224  window_func_init_val = window_func_null_val;
225  }
226  const auto pi32_type =
227  llvm::PointerType::get(get_int_type(32, cgen_state_->context_), 0);
228  switch (window_func_ti.get_type()) {
229  case kDOUBLE: {
230  cgen_state_->emitCall("agg_id_double", {aggregate_state, window_func_init_val});
231  break;
232  }
233  case kFLOAT: {
234  aggregate_state =
235  cgen_state_->ir_builder_.CreateBitCast(aggregate_state, pi32_type);
236  cgen_state_->emitCall("agg_id_float", {aggregate_state, window_func_init_val});
237  break;
238  }
239  default: {
240  cgen_state_->emitCall("agg_id", {aggregate_state, window_func_init_val});
241  break;
242  }
243  }
244 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
SQLTypeInfo get_adjusted_window_type_info(const Analyzer::WindowFunction *window_func)

+ Here is the call graph for this function:

llvm::BasicBlock * Executor::codegenWindowResetStateControlFlow ( )
private

Definition at line 167 of file WindowFunctionIR.cpp.

References AUTOMATIC_IR_METADATA, WindowProjectNodeContext::getActiveWindowFunctionContext(), CodeGenerator::posArg(), and CodeGenerator::toBool().

167  {
169  const auto window_func_context =
171  const auto bitset = cgen_state_->llInt(
172  reinterpret_cast<const int64_t>(window_func_context->partitionStart()));
173  const auto min_val = cgen_state_->llInt(int64_t(0));
174  const auto max_val = cgen_state_->llInt(window_func_context->elementCount() - 1);
175  const auto null_val = cgen_state_->llInt(inline_int_null_value<int64_t>());
176  const auto null_bool_val = cgen_state_->llInt<int8_t>(inline_int_null_value<int8_t>());
177  CodeGenerator code_generator(this);
178  const auto reset_state =
179  code_generator.toBool(cgen_state_->emitCall("bit_is_set",
180  {bitset,
181  code_generator.posArg(nullptr),
182  min_val,
183  max_val,
184  null_val,
185  null_bool_val}));
186  const auto reset_state_true_bb = llvm::BasicBlock::Create(
187  cgen_state_->context_, "reset_state.true", cgen_state_->current_func_);
188  const auto reset_state_false_bb = llvm::BasicBlock::Create(
189  cgen_state_->context_, "reset_state.false", cgen_state_->current_func_);
190  cgen_state_->ir_builder_.CreateCondBr(
191  reset_state, reset_state_true_bb, reset_state_false_bb);
192  cgen_state_->ir_builder_.SetInsertPoint(reset_state_true_bb);
193  return reset_state_false_bb;
194 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
#define AUTOMATIC_IR_METADATA(CGENSTATE)

+ Here is the call graph for this function:

ResultSetPtr Executor::collectAllDeviceResults ( SharedKernelContext shared_context,
const RelAlgExecutionUnit ra_exe_unit,
const QueryMemoryDescriptor query_mem_desc,
const ExecutorDeviceType  device_type,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner 
)
private

Definition at line 1853 of file Execute.cpp.

References anonymous_namespace{Execute.cpp}::build_row_for_empty_input(), catalog_(), DEBUG_TIMER, SharedKernelContext::getFragmentResults(), QueryMemoryDescriptor::getQueryDescriptionType(), GPU, NonGroupedAggregate, GroupByAndAggregate::shard_count_for_top_groups(), RelAlgExecutionUnit::target_exprs, and use_speculative_top_n().

1858  {
1859  auto timer = DEBUG_TIMER(__func__);
1860  auto& result_per_device = shared_context.getFragmentResults();
1861  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
1864  ra_exe_unit.target_exprs, query_mem_desc, device_type);
1865  }
1866  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
1867  try {
1868  return reduceSpeculativeTopN(
1869  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1870  } catch (const std::bad_alloc&) {
1871  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
1872  }
1873  }
1874  const auto shard_count =
1875  device_type == ExecutorDeviceType::GPU
1877  : 0;
1878 
1879  if (shard_count && !result_per_device.empty()) {
1880  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit);
1881  }
1882  return reduceMultiDeviceResults(
1883  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1884 }
std::vector< Analyzer::Expr * > target_exprs
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:1005
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1054
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:898
ResultSetPtr collectAllDeviceShardedTopResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit) const
Definition: Execute.cpp:1968
QueryDescriptionType getQueryDescriptionType() const
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr * > &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1811
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
#define DEBUG_TIMER(name)
Definition: Logger.h:313
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)

+ Here is the call graph for this function:

ResultSetPtr Executor::collectAllDeviceShardedTopResults ( SharedKernelContext shared_context,
const RelAlgExecutionUnit ra_exe_unit 
) const
private

Definition at line 1968 of file Execute.cpp.

References catalog_(), CHECK, CHECK_EQ, CHECK_LE, SharedKernelContext::getFragmentResults(), SortInfo::limit, SortInfo::offset, SortInfo::order_entries, anonymous_namespace{Execute.cpp}::permute_storage_columnar(), anonymous_namespace{Execute.cpp}::permute_storage_row_wise(), run_benchmark_import::result, and RelAlgExecutionUnit::sort_info.

1970  {
1971  auto& result_per_device = shared_context.getFragmentResults();
1972  const auto first_result_set = result_per_device.front().first;
1973  CHECK(first_result_set);
1974  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
1975  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
1976  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1977  top_query_mem_desc.setEntryCount(0);
1978  for (auto& result : result_per_device) {
1979  const auto result_set = result.first;
1980  CHECK(result_set);
1981  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n, this);
1982  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
1983  top_query_mem_desc.setEntryCount(new_entry_cnt);
1984  }
1985  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
1986  first_result_set->getDeviceType(),
1987  top_query_mem_desc,
1988  first_result_set->getRowSetMemOwner(),
1989  catalog_,
1990  blockSize(),
1991  gridSize());
1992  auto top_storage = top_result_set->allocateStorage();
1993  size_t top_output_row_idx{0};
1994  for (auto& result : result_per_device) {
1995  const auto result_set = result.first;
1996  CHECK(result_set);
1997  const auto& top_permutation = result_set->getPermutationBuffer();
1998  CHECK_LE(top_permutation.size(), top_n);
1999  if (top_query_mem_desc.didOutputColumnar()) {
2000  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
2001  result_set->getQueryMemDesc(),
2002  top_storage,
2003  top_output_row_idx,
2004  top_query_mem_desc,
2005  top_permutation);
2006  } else {
2007  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
2008  top_storage,
2009  top_output_row_idx,
2010  top_query_mem_desc,
2011  top_permutation);
2012  }
2013  }
2014  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2015  return top_result_set;
2016 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
const std::list< Analyzer::OrderEntry > order_entries
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1947
const size_t limit
const SortInfo sort_info
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1054
#define CHECK_LE(x, y)
Definition: Logger.h:208
unsigned gridSize() const
Definition: Execute.cpp:3322
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1897
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
#define CHECK(condition)
Definition: Logger.h:197
unsigned blockSize() const
Definition: Execute.cpp:3339
const size_t offset

+ Here is the call graph for this function:

bool Executor::compileBody ( const RelAlgExecutionUnit ra_exe_unit,
GroupByAndAggregate group_by_and_aggregate,
const QueryMemoryDescriptor query_mem_desc,
const CompilationOptions co,
const GpuSharedMemoryContext gpu_smem_context = {} 
)
private

Definition at line 2988 of file NativeCodegen.cpp.

2992  {
2994 
2995  // Switch the code generation into a separate filter function if enabled.
2996  // Note that accesses to function arguments are still codegenned from the
2997  // row function's arguments, then later automatically forwarded and
2998  // remapped into filter function arguments by redeclareFilterFunction().
2999  cgen_state_->row_func_bb_ = cgen_state_->ir_builder_.GetInsertBlock();
3000  llvm::Value* loop_done{nullptr};
3001  std::unique_ptr<Executor::FetchCacheAnchor> fetch_cache_anchor;
3002  if (cgen_state_->filter_func_) {
3003  if (cgen_state_->row_func_bb_->getName() == "loop_body") {
3004  auto row_func_entry_bb = &cgen_state_->row_func_->getEntryBlock();
3005  cgen_state_->ir_builder_.SetInsertPoint(row_func_entry_bb,
3006  row_func_entry_bb->begin());
3007  loop_done = cgen_state_->ir_builder_.CreateAlloca(
3008  get_int_type(1, cgen_state_->context_), nullptr, "loop_done");
3009  cgen_state_->ir_builder_.SetInsertPoint(cgen_state_->row_func_bb_);
3010  cgen_state_->ir_builder_.CreateStore(cgen_state_->llBool(true), loop_done);
3011  }
3012  cgen_state_->ir_builder_.SetInsertPoint(cgen_state_->filter_func_bb_);
3013  cgen_state_->current_func_ = cgen_state_->filter_func_;
3014  fetch_cache_anchor = std::make_unique<Executor::FetchCacheAnchor>(cgen_state_.get());
3015  }
3016 
3017  // generate the code for the filter
3018  std::vector<Analyzer::Expr*> primary_quals;
3019  std::vector<Analyzer::Expr*> deferred_quals;
3020  bool short_circuited = CodeGenerator::prioritizeQuals(
3021  ra_exe_unit, primary_quals, deferred_quals, plan_state_->hoisted_filters_);
3022  if (short_circuited) {
3023  VLOG(1) << "Prioritized " << std::to_string(primary_quals.size()) << " quals, "
3024  << "short-circuited and deferred " << std::to_string(deferred_quals.size())
3025  << " quals";
3026  }
3027  llvm::Value* filter_lv = cgen_state_->llBool(true);
3028  CodeGenerator code_generator(this);
3029  for (auto expr : primary_quals) {
3030  // Generate the filter for primary quals
3031  auto cond = code_generator.toBool(code_generator.codegen(expr, true, co).front());
3032  filter_lv = cgen_state_->ir_builder_.CreateAnd(filter_lv, cond);
3033  }
3034  CHECK(filter_lv->getType()->isIntegerTy(1));
3035  llvm::BasicBlock* sc_false{nullptr};
3036  if (!deferred_quals.empty()) {
3037  auto sc_true = llvm::BasicBlock::Create(
3038  cgen_state_->context_, "sc_true", cgen_state_->current_func_);
3039  sc_false = llvm::BasicBlock::Create(
3040  cgen_state_->context_, "sc_false", cgen_state_->current_func_);
3041  cgen_state_->ir_builder_.CreateCondBr(filter_lv, sc_true, sc_false);
3042  cgen_state_->ir_builder_.SetInsertPoint(sc_false);
3043  if (ra_exe_unit.join_quals.empty()) {
3044  cgen_state_->ir_builder_.CreateRet(cgen_state_->llInt(int32_t(0)));
3045  }
3046  cgen_state_->ir_builder_.SetInsertPoint(sc_true);
3047  filter_lv = cgen_state_->llBool(true);
3048  }
3049  for (auto expr : deferred_quals) {
3050  filter_lv = cgen_state_->ir_builder_.CreateAnd(
3051  filter_lv, code_generator.toBool(code_generator.codegen(expr, true, co).front()));
3052  }
3053 
3054  CHECK(filter_lv->getType()->isIntegerTy(1));
3055  auto ret = group_by_and_aggregate.codegen(
3056  filter_lv, sc_false, query_mem_desc, co, gpu_smem_context);
3057 
3058  // Switch the code generation back to the row function if a filter
3059  // function was enabled.
3060  if (cgen_state_->filter_func_) {
3061  if (cgen_state_->row_func_bb_->getName() == "loop_body") {
3062  cgen_state_->ir_builder_.CreateStore(cgen_state_->llBool(false), loop_done);
3063  cgen_state_->ir_builder_.CreateRet(cgen_state_->llInt<int32_t>(0));
3064  }
3065 
3066  cgen_state_->ir_builder_.SetInsertPoint(cgen_state_->row_func_bb_);
3067  cgen_state_->current_func_ = cgen_state_->row_func_;
3068  cgen_state_->filter_func_call_ =
3069  cgen_state_->ir_builder_.CreateCall(cgen_state_->filter_func_, {});
3070 
3071  // Create real filter function declaration after placeholder call
3072  // is emitted.
3074 
3075  if (cgen_state_->row_func_bb_->getName() == "loop_body") {
3076  auto loop_done_true = llvm::BasicBlock::Create(
3077  cgen_state_->context_, "loop_done_true", cgen_state_->row_func_);
3078  auto loop_done_false = llvm::BasicBlock::Create(
3079  cgen_state_->context_, "loop_done_false", cgen_state_->row_func_);
3080  auto loop_done_flag = cgen_state_->ir_builder_.CreateLoad(loop_done);
3081  cgen_state_->ir_builder_.CreateCondBr(
3082  loop_done_flag, loop_done_true, loop_done_false);
3083  cgen_state_->ir_builder_.SetInsertPoint(loop_done_true);
3084  cgen_state_->ir_builder_.CreateRet(cgen_state_->filter_func_call_);
3085  cgen_state_->ir_builder_.SetInsertPoint(loop_done_false);
3086  } else {
3087  cgen_state_->ir_builder_.CreateRet(cgen_state_->filter_func_call_);
3088  }
3089  }
3090  return ret;
3091 }
bool codegen(llvm::Value *filter_result, llvm::BasicBlock *sc_false, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const GpuSharedMemoryContext &gpu_smem_context)
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
std::string to_string(char const *&&v)
const JoinQualsPerNestingLevel join_quals
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1025
#define AUTOMATIC_IR_METADATA(CGENSTATE)
static bool prioritizeQuals(const RelAlgExecutionUnit &ra_exe_unit, std::vector< Analyzer::Expr * > &primary_quals, std::vector< Analyzer::Expr * > &deferred_quals, const PlanState::HoistedFiltersSet &hoisted_quals)
Definition: LogicalIR.cpp:157
#define CHECK(condition)
Definition: Logger.h:197
void redeclareFilterFunction()
Definition: IRCodegen.cpp:709
#define VLOG(n)
Definition: Logger.h:291
std::tuple< CompilationResult, std::unique_ptr< QueryMemoryDescriptor > > Executor::compileWorkUnit ( const std::vector< InputTableInfo > &  query_infos,
const PlanState::DeletedColumnsMap deleted_cols_map,
const RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co,
const ExecutionOptions eo,
const CudaMgr_Namespace::CudaMgr cuda_mgr,
const bool  allow_lazy_fetch,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const size_t  max_groups_buffer_entry_count,
const int8_t  crt_min_byte_width,
const bool  has_cardinality_estimation,
ColumnCacheMap column_cache,
RenderInfo render_info = nullptr 
)
private

Definition at line 2479 of file NativeCodegen.cpp.

2491  {
2492  auto timer = DEBUG_TIMER(__func__);
2493 
2495  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2496  if (!cuda_mgr) {
2497  throw QueryMustRunOnCpu();
2498  }
2499  }
2500 
2501 #ifndef NDEBUG
2502  static std::uint64_t counter = 0;
2503  ++counter;
2504  VLOG(1) << "CODEGEN #" << counter << ":";
2505  LOG(IR) << "CODEGEN #" << counter << ":";
2506  LOG(PTX) << "CODEGEN #" << counter << ":";
2507  LOG(ASM) << "CODEGEN #" << counter << ":";
2508 #endif
2509 
2510  nukeOldState(allow_lazy_fetch, query_infos, deleted_cols_map, &ra_exe_unit);
2511 
2512  GroupByAndAggregate group_by_and_aggregate(
2513  this,
2514  co.device_type,
2515  ra_exe_unit,
2516  query_infos,
2517  row_set_mem_owner,
2518  has_cardinality_estimation ? std::optional<int64_t>(max_groups_buffer_entry_guess)
2519  : std::nullopt);
2520  auto query_mem_desc =
2521  group_by_and_aggregate.initQueryMemoryDescriptor(eo.allow_multifrag,
2522  max_groups_buffer_entry_guess,
2523  crt_min_byte_width,
2524  render_info,
2526 
2527  if (query_mem_desc->getQueryDescriptionType() ==
2529  !has_cardinality_estimation &&
2530  (!render_info || !render_info->isPotentialInSituRender()) && !eo.just_explain) {
2531  const auto col_range_info = group_by_and_aggregate.getColRangeInfo();
2532  throw CardinalityEstimationRequired(col_range_info.max - col_range_info.min);
2533  }
2534 
2535  const bool output_columnar = query_mem_desc->didOutputColumnar();
2536  const bool gpu_shared_mem_optimization =
2538  ra_exe_unit,
2539  cuda_mgr,
2540  co.device_type,
2541  cuda_mgr ? this->blockSize() : 1,
2542  cuda_mgr ? this->numBlocksPerMP() : 1);
2543  if (gpu_shared_mem_optimization) {
2544  // disable interleaved bins optimization on the GPU
2545  query_mem_desc->setHasInterleavedBinsOnGpu(false);
2546  LOG(DEBUG1) << "GPU shared memory is used for the " +
2547  query_mem_desc->queryDescTypeToString() + " query(" +
2548  std::to_string(get_shared_memory_size(gpu_shared_mem_optimization,
2549  query_mem_desc.get())) +
2550  " out of " + std::to_string(g_gpu_smem_threshold) + " bytes).";
2551  }
2552 
2553  const GpuSharedMemoryContext gpu_smem_context(
2554  get_shared_memory_size(gpu_shared_mem_optimization, query_mem_desc.get()));
2555 
2557  const size_t num_count_distinct_descs =
2558  query_mem_desc->getCountDistinctDescriptorsSize();
2559  for (size_t i = 0; i < num_count_distinct_descs; i++) {
2560  const auto& count_distinct_descriptor =
2561  query_mem_desc->getCountDistinctDescriptor(i);
2562  if (count_distinct_descriptor.impl_type_ == CountDistinctImplType::StdSet ||
2563  (count_distinct_descriptor.impl_type_ != CountDistinctImplType::Invalid &&
2564  !co.hoist_literals)) {
2565  throw QueryMustRunOnCpu();
2566  }
2567  }
2568  }
2569 
2570  // Read the module template and target either CPU or GPU
2571  // by binding the stream position functions to the right implementation:
2572  // stride access for GPU, contiguous for CPU
2573  auto rt_module_copy = llvm::CloneModule(
2574  *g_rt_module.get(), cgen_state_->vmap_, [](const llvm::GlobalValue* gv) {
2575  auto func = llvm::dyn_cast<llvm::Function>(gv);
2576  if (!func) {
2577  return true;
2578  }
2579  return (func->getLinkage() == llvm::GlobalValue::LinkageTypes::PrivateLinkage ||
2580  func->getLinkage() == llvm::GlobalValue::LinkageTypes::InternalLinkage ||
2582  });
2584  if (is_udf_module_present(true)) {
2586  }
2587  if (is_rt_udf_module_present(true)) {
2589  rt_udf_cpu_module, *rt_module_copy, cgen_state_.get());
2590  }
2591  } else {
2592  rt_module_copy->setDataLayout(get_gpu_data_layout());
2593  rt_module_copy->setTargetTriple(get_gpu_target_triple_string());
2594  if (is_udf_module_present()) {
2596  }
2597  if (is_rt_udf_module_present()) {
2599  rt_udf_gpu_module, *rt_module_copy, cgen_state_.get());
2600  }
2601  }
2602 
2603  cgen_state_->module_ = rt_module_copy.release();
2605 
2606  auto agg_fnames =
2607  get_agg_fnames(ra_exe_unit.target_exprs, !ra_exe_unit.groupby_exprs.empty());
2608 
2609  const auto agg_slot_count = ra_exe_unit.estimator ? size_t(1) : agg_fnames.size();
2610 
2611  const bool is_group_by{query_mem_desc->isGroupBy()};
2612  auto [query_func, row_func_call] = is_group_by
2614  co.hoist_literals,
2615  *query_mem_desc,
2616  co.device_type,
2617  ra_exe_unit.scan_limit,
2618  gpu_smem_context)
2619  : query_template(cgen_state_->module_,
2620  agg_slot_count,
2621  co.hoist_literals,
2622  !!ra_exe_unit.estimator,
2623  gpu_smem_context);
2624  bind_pos_placeholders("pos_start", true, query_func, cgen_state_->module_);
2625  bind_pos_placeholders("group_buff_idx", false, query_func, cgen_state_->module_);
2626  bind_pos_placeholders("pos_step", false, query_func, cgen_state_->module_);
2627 
2628  cgen_state_->query_func_ = query_func;
2629  cgen_state_->row_func_call_ = row_func_call;
2630  cgen_state_->query_func_entry_ir_builder_.SetInsertPoint(
2631  &query_func->getEntryBlock().front());
2632 
2633  // Generate the function signature and column head fetches s.t.
2634  // double indirection isn't needed in the inner loop
2635  auto& fetch_bb = query_func->front();
2636  llvm::IRBuilder<> fetch_ir_builder(&fetch_bb);
2637  fetch_ir_builder.SetInsertPoint(&*fetch_bb.begin());
2638  auto col_heads = generate_column_heads_load(ra_exe_unit.input_col_descs.size(),
2639  query_func->args().begin(),
2640  fetch_ir_builder,
2641  cgen_state_->context_);
2642  CHECK_EQ(ra_exe_unit.input_col_descs.size(), col_heads.size());
2643 
2644  cgen_state_->row_func_ = create_row_function(ra_exe_unit.input_col_descs.size(),
2645  is_group_by ? 0 : agg_slot_count,
2646  co.hoist_literals,
2647  cgen_state_->module_,
2648  cgen_state_->context_);
2649  CHECK(cgen_state_->row_func_);
2650  cgen_state_->row_func_bb_ =
2651  llvm::BasicBlock::Create(cgen_state_->context_, "entry", cgen_state_->row_func_);
2652 
2654  auto filter_func_ft =
2655  llvm::FunctionType::get(get_int_type(32, cgen_state_->context_), {}, false);
2656  cgen_state_->filter_func_ = llvm::Function::Create(filter_func_ft,
2657  llvm::Function::ExternalLinkage,
2658  "filter_func",
2659  cgen_state_->module_);
2660  CHECK(cgen_state_->filter_func_);
2661  cgen_state_->filter_func_bb_ = llvm::BasicBlock::Create(
2662  cgen_state_->context_, "entry", cgen_state_->filter_func_);
2663  }
2664 
2665  cgen_state_->current_func_ = cgen_state_->row_func_;
2666  cgen_state_->ir_builder_.SetInsertPoint(cgen_state_->row_func_bb_);
2667 
2668  preloadFragOffsets(ra_exe_unit.input_descs, query_infos);
2669  RelAlgExecutionUnit body_execution_unit = ra_exe_unit;
2670  const auto join_loops =
2671  buildJoinLoops(body_execution_unit, co, eo, query_infos, column_cache);
2672 
2673  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
2674  const auto is_not_deleted_bb = codegenSkipDeletedOuterTableRow(ra_exe_unit, co);
2675  if (is_not_deleted_bb) {
2676  cgen_state_->row_func_bb_ = is_not_deleted_bb;
2677  }
2678  if (!join_loops.empty()) {
2679  codegenJoinLoops(join_loops,
2680  body_execution_unit,
2681  group_by_and_aggregate,
2682  query_func,
2683  cgen_state_->row_func_bb_,
2684  *(query_mem_desc.get()),
2685  co,
2686  eo);
2687  } else {
2688  const bool can_return_error = compileBody(
2689  ra_exe_unit, group_by_and_aggregate, *query_mem_desc, co, gpu_smem_context);
2690  if (can_return_error || cgen_state_->needs_error_check_ || eo.with_dynamic_watchdog ||
2692  createErrorCheckControlFlow(query_func,
2695  co.device_type,
2696  group_by_and_aggregate.query_infos_);
2697  }
2698  }
2699  std::vector<llvm::Value*> hoisted_literals;
2700 
2701  if (co.hoist_literals) {
2702  VLOG(1) << "number of hoisted literals: "
2703  << cgen_state_->query_func_literal_loads_.size()
2704  << " / literal buffer usage: " << cgen_state_->getLiteralBufferUsage(0)
2705  << " bytes";
2706  }
2707 
2708  if (co.hoist_literals && !cgen_state_->query_func_literal_loads_.empty()) {
2709  // we have some hoisted literals...
2710  hoisted_literals = inlineHoistedLiterals();
2711  }
2712 
2713  // replace the row func placeholder call with the call to the actual row func
2714  std::vector<llvm::Value*> row_func_args;
2715  for (size_t i = 0; i < cgen_state_->row_func_call_->getNumArgOperands(); ++i) {
2716  row_func_args.push_back(cgen_state_->row_func_call_->getArgOperand(i));
2717  }
2718  row_func_args.insert(row_func_args.end(), col_heads.begin(), col_heads.end());
2719  row_func_args.push_back(get_arg_by_name(query_func, "join_hash_tables"));
2720  // push hoisted literals arguments, if any
2721  row_func_args.insert(
2722  row_func_args.end(), hoisted_literals.begin(), hoisted_literals.end());
2723  llvm::ReplaceInstWithInst(
2724  cgen_state_->row_func_call_,
2725  llvm::CallInst::Create(cgen_state_->row_func_, row_func_args, ""));
2726 
2727  // replace the filter func placeholder call with the call to the actual filter func
2728  if (cgen_state_->filter_func_) {
2729  std::vector<llvm::Value*> filter_func_args;
2730  for (auto arg_it = cgen_state_->filter_func_args_.begin();
2731  arg_it != cgen_state_->filter_func_args_.end();
2732  ++arg_it) {
2733  filter_func_args.push_back(arg_it->first);
2734  }
2735  llvm::ReplaceInstWithInst(
2736  cgen_state_->filter_func_call_,
2737  llvm::CallInst::Create(cgen_state_->filter_func_, filter_func_args, ""));
2738  }
2739 
2740  // Aggregate
2741  plan_state_->init_agg_vals_ =
2742  init_agg_val_vec(ra_exe_unit.target_exprs, ra_exe_unit.quals, *query_mem_desc);
2743 
2744  /*
2745  * If we have decided to use GPU shared memory (decision is not made here), then
2746  * we generate proper code for extra components that it needs (buffer initialization and
2747  * gpu reduction from shared memory to global memory). We then replace these functions
2748  * into the already compiled query_func (replacing two placeholders, write_back_nop and
2749  * init_smem_nop). The rest of the code should be as before (row_func, etc.).
2750  */
2751  if (gpu_smem_context.isSharedMemoryUsed()) {
2752  if (query_mem_desc->getQueryDescriptionType() ==
2754  GpuSharedMemCodeBuilder gpu_smem_code(
2755  cgen_state_->module_,
2756  cgen_state_->context_,
2757  *query_mem_desc,
2759  plan_state_->init_agg_vals_);
2760  gpu_smem_code.codegen();
2761  gpu_smem_code.injectFunctionsInto(query_func);
2762 
2763  // helper functions are used for caching purposes later
2764  cgen_state_->helper_functions_.push_back(gpu_smem_code.getReductionFunction());
2765  cgen_state_->helper_functions_.push_back(gpu_smem_code.getInitFunction());
2766  LOG(IR) << gpu_smem_code.toString();
2767  }
2768  }
2769 
2770  auto multifrag_query_func = cgen_state_->module_->getFunction(
2771  "multifrag_query" + std::string(co.hoist_literals ? "_hoisted_literals" : ""));
2772  CHECK(multifrag_query_func);
2773 
2776  multifrag_query_func, co.hoist_literals, eo.allow_runtime_query_interrupt);
2777  }
2778 
2779  bind_query(query_func,
2780  "query_stub" + std::string(co.hoist_literals ? "_hoisted_literals" : ""),
2781  multifrag_query_func,
2782  cgen_state_->module_);
2783 
2784  std::vector<llvm::Function*> root_funcs{query_func, cgen_state_->row_func_};
2785  if (cgen_state_->filter_func_) {
2786  root_funcs.push_back(cgen_state_->filter_func_);
2787  }
2788  auto live_funcs = CodeGenerator::markDeadRuntimeFuncs(
2789  *cgen_state_->module_, root_funcs, {multifrag_query_func});
2790 
2791  // Always inline the row function and the filter function.
2792  // We don't want register spills in the inner loops.
2793  // LLVM seems to correctly free up alloca instructions
2794  // in these functions even when they are inlined.
2796  if (cgen_state_->filter_func_) {
2798  }
2799 
2800 #ifndef NDEBUG
2801  // Add helpful metadata to the LLVM IR for debugging.
2803 #endif
2804 
2805  // Serialize the important LLVM IR functions to text for SQL EXPLAIN.
2806  std::string llvm_ir;
2807  if (eo.just_explain) {
2809 #ifdef WITH_JIT_DEBUG
2810  throw std::runtime_error(
2811  "Explain optimized not available when JIT runtime debug symbols are enabled");
2812 #else
2813  // Note that we don't run the NVVM reflect pass here. Use LOG(IR) to get the
2814  // optimized IR after NVVM reflect
2815  llvm::legacy::PassManager pass_manager;
2816  optimize_ir(query_func, cgen_state_->module_, pass_manager, live_funcs, co);
2817 #endif // WITH_JIT_DEBUG
2818  }
2819  llvm_ir =
2820  serialize_llvm_object(multifrag_query_func) + serialize_llvm_object(query_func) +
2821  serialize_llvm_object(cgen_state_->row_func_) +
2822  (cgen_state_->filter_func_ ? serialize_llvm_object(cgen_state_->filter_func_)
2823  : "");
2824 
2825 #ifndef NDEBUG
2826  llvm_ir += serialize_llvm_metadata_footnotes(query_func, cgen_state_.get());
2827 #endif
2828  }
2829 
2830  LOG(IR) << "\n\n" << query_mem_desc->toString() << "\n";
2831  LOG(IR) << "IR for the "
2832  << (co.device_type == ExecutorDeviceType::CPU ? "CPU:\n" : "GPU:\n");
2833 #ifdef NDEBUG
2834  LOG(IR) << serialize_llvm_object(query_func)
2835  << serialize_llvm_object(cgen_state_->row_func_)
2836  << (cgen_state_->filter_func_ ? serialize_llvm_object(cgen_state_->filter_func_)
2837  : "")
2838  << "\nEnd of IR";
2839 #else
2840  LOG(IR) << serialize_llvm_object(cgen_state_->module_) << "\nEnd of IR";
2841 #endif
2842 
2843  // Run some basic validation checks on the LLVM IR before code is generated below.
2844  verify_function_ir(cgen_state_->row_func_);
2845  if (cgen_state_->filter_func_) {
2846  verify_function_ir(cgen_state_->filter_func_);
2847  }
2848 
2849  // Generate final native code from the LLVM IR.
2850  return std::make_tuple(
2853  ? optimizeAndCodegenCPU(query_func, multifrag_query_func, live_funcs, co)
2854  : optimizeAndCodegenGPU(query_func,
2855  multifrag_query_func,
2856  live_funcs,
2857  is_group_by || ra_exe_unit.estimator,
2858  cuda_mgr,
2859  co),
2860  cgen_state_->getLiterals(),
2861  output_columnar,
2862  llvm_ir,
2863  std::move(gpu_smem_context)},
2864  std::move(query_mem_desc));
2865 }
std::tuple< llvm::Function *, llvm::CallInst * > query_group_by_template(llvm::Module *module, const bool hoist_literals, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, const bool check_scan_limit, const GpuSharedMemoryContext &gpu_smem_context)
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:207
std::vector< Analyzer::Expr * > target_exprs
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::unique_ptr< llvm::Module > rt_udf_cpu_module
void codegenJoinLoops(const std::vector< JoinLoop > &join_loops, const RelAlgExecutionUnit &ra_exe_unit, GroupByAndAggregate &group_by_and_aggregate, llvm::Function *query_func, llvm::BasicBlock *entry_bb, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const ExecutionOptions &eo)
Definition: IRCodegen.cpp:825
std::unique_ptr< llvm::Module > udf_gpu_module
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:222
#define LOG(tag)
Definition: Logger.h:188
std::unique_ptr< llvm::Module > rt_udf_gpu_module
void mark_function_always_inline(llvm::Function *func)
llvm::StringRef get_gpu_data_layout()
bool is_udf_module_present(bool cpu_only=false)
std::vector< InputDescriptor > input_descs
std::string serialize_llvm_metadata_footnotes(llvm::Function *query_func, CgenState *cgen_state)
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
unsigned numBlocksPerMP() const
Definition: Execute.cpp:3331
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
void optimize_ir(llvm::Function *query_func, llvm::Module *module, llvm::legacy::PassManager &pass_manager, const std::unordered_set< llvm::Function * > &live_funcs, const CompilationOptions &co)
std::vector< std::string > get_agg_fnames(const std::vector< Analyzer::Expr * > &target_exprs, const bool is_group_by)
std::string to_string(char const *&&v)
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:3264
bool is_gpu_shared_mem_supported(const QueryMemoryDescriptor *query_mem_desc_ptr, const RelAlgExecutionUnit &ra_exe_unit, const CudaMgr_Namespace::CudaMgr *cuda_mgr, const ExecutorDeviceType device_type, const unsigned gpu_blocksize, const unsigned num_blocks_per_mp)
llvm::StringRef get_gpu_target_triple_string()
bool compileBody(const RelAlgExecutionUnit &ra_exe_unit, GroupByAndAggregate &group_by_and_aggregate, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const GpuSharedMemoryContext &gpu_smem_context={})
void verify_function_ir(const llvm::Function *func)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:168
std::shared_ptr< CompilationContext > optimizeAndCodegenGPU(llvm::Function *, llvm::Function *, std::unordered_set< llvm::Function * > &, const bool no_inline, const CudaMgr_Namespace::CudaMgr *cuda_mgr, const CompilationOptions &)
static std::unordered_set< llvm::Function * > markDeadRuntimeFuncs(llvm::Module &module, const std::vector< llvm::Function * > &roots, const std::vector< llvm::Function * > &leaves)
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1054
std::unique_ptr< llvm::Module > g_rt_module
ExecutorExplainType explain_type
std::shared_ptr< CompilationContext > optimizeAndCodegenCPU(llvm::Function *, llvm::Function *, const std::unordered_set< llvm::Function * > &, const CompilationOptions &)
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1025
void insertErrorCodeChecker(llvm::Function *query_func, bool hoist_literals, bool allow_runtime_query_interrupt)
static void link_udf_module(const std::unique_ptr< llvm::Module > &udf_module, llvm::Module &module, CgenState *cgen_state, llvm::Linker::Flags flags=llvm::Linker::Flags::None)
const std::shared_ptr< Analyzer::Estimator > estimator
#define AUTOMATIC_IR_METADATA(CGENSTATE)
#define AUTOMATIC_IR_METADATA_DONE()
ExecutorDeviceType device_type
llvm::BasicBlock * codegenSkipDeletedOuterTableRow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
std::string serialize_llvm_object(const T *llvm_obj)
static bool alwaysCloneRuntimeFunction(const llvm::Function *func)
std::vector< llvm::Value * > generate_column_heads_load(const int num_columns, llvm::Value *byte_stream_arg, llvm::IRBuilder<> &ir_builder, llvm::LLVMContext &ctx)
std::unique_ptr< llvm::Module > udf_cpu_module
bool g_enable_filter_function
Definition: Execute.cpp:79
void bind_pos_placeholders(const std::string &pos_fn_name, const bool use_resume_param, llvm::Function *query_func, llvm::Module *module)
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const PlanState::DeletedColumnsMap &deleted_cols_map, const RelAlgExecutionUnit *ra_exe_unit)
Definition: Execute.cpp:3245
std::tuple< llvm::Function *, llvm::CallInst * > query_template(llvm::Module *module, const size_t aggr_col_count, const bool hoist_literals, const bool is_estimate_query, const GpuSharedMemoryContext &gpu_smem_context)
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::vector< llvm::Value * > inlineHoistedLiterals()
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr * > &targets, const QueryMemoryDescriptor &query_mem_desc)
llvm::Function * create_row_function(const size_t in_col_count, const size_t agg_col_count, const bool hoist_literals, llvm::Module *module, llvm::LLVMContext &context)
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
std::vector< JoinLoop > buildJoinLoops(RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, const std::vector< InputTableInfo > &query_infos, ColumnCacheMap &column_cache)
Definition: IRCodegen.cpp:279
unsigned blockSize() const
Definition: Execute.cpp:3339
std::vector< int64_t > init_agg_val_vec(const std::vector< TargetInfo > &targets, const QueryMemoryDescriptor &query_mem_desc)
bool is_rt_udf_module_present(bool cpu_only=false)
#define VLOG(n)
Definition: Logger.h:291
size_t get_shared_memory_size(const bool shared_mem_used, const QueryMemoryDescriptor *query_mem_desc_ptr)
void bind_query(llvm::Function *query_func, const std::string &query_fname, llvm::Function *multifrag_query_func, llvm::Module *module)
void createErrorCheckControlFlow(llvm::Function *query_func, bool run_with_dynamic_watchdog, bool run_with_allowing_runtime_interrupt, ExecutorDeviceType device_type, const std::vector< InputTableInfo > &input_table_infos)
size_t g_gpu_smem_threshold
Definition: Execute.cpp:119
AggregatedColRange Executor::computeColRangesCache ( const std::unordered_set< PhysicalInput > &  phys_inputs)
private

Definition at line 3743 of file Execute.cpp.

References catalog_(), CHECK, getLeafColumnRange(), AggregatedColRange::setColRange(), and ExpressionRange::typeSupportsRange().

3744  {
3745  AggregatedColRange agg_col_range_cache;
3746  CHECK(catalog_);
3747  std::unordered_set<int> phys_table_ids;
3748  for (const auto& phys_input : phys_inputs) {
3749  phys_table_ids.insert(phys_input.table_id);
3750  }
3751  std::vector<InputTableInfo> query_infos;
3752  for (const int table_id : phys_table_ids) {
3753  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
3754  }
3755  for (const auto& phys_input : phys_inputs) {
3756  const auto cd =
3757  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3758  CHECK(cd);
3759  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
3760  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
3761  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
3762  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
3763  agg_col_range_cache.setColRange(phys_input, col_range);
3764  }
3765  }
3766  return agg_col_range_cache;
3767 }
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:292
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1054
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
#define CHECK(condition)
Definition: Logger.h:197
void setColRange(const PhysicalInput &, const ExpressionRange &)
static bool typeSupportsRange(const SQLTypeInfo &ti)

+ Here is the call graph for this function:

StringDictionaryGenerations Executor::computeStringDictionaryGenerations ( const std::unordered_set< PhysicalInput > &  phys_inputs)
private

Definition at line 3769 of file Execute.cpp.

References catalog_(), CHECK, kENCODING_DICT, and StringDictionaryGenerations::setGeneration().

3770  {
3771  StringDictionaryGenerations string_dictionary_generations;
3772  CHECK(catalog_);
3773  for (const auto& phys_input : phys_inputs) {
3774  const auto cd =
3775  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3776  CHECK(cd);
3777  const auto& col_ti =
3778  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
3779  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
3780  const int dict_id = col_ti.get_comp_param();
3781  const auto dd = catalog_->getMetadataForDict(dict_id);
3782  CHECK(dd && dd->stringDict);
3783  string_dictionary_generations.setGeneration(dict_id,
3784  dd->stringDict->storageEntryCount());
3785  }
3786  }
3787  return string_dictionary_generations;
3788 }
void setGeneration(const uint32_t id, const uint64_t generation)
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1054
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1444
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

TableGenerations Executor::computeTableGenerations ( std::unordered_set< int >  phys_table_ids)
private

Definition at line 3790 of file Execute.cpp.

References TableGenerations::setGeneration().

3791  {
3792  TableGenerations table_generations;
3793  for (const int table_id : phys_table_ids) {
3794  const auto table_info = getTableInfo(table_id);
3795  table_generations.setGeneration(
3796  table_id,
3797  TableGeneration{static_cast<int64_t>(table_info.getPhysicalNumTuples()), 0});
3798  }
3799  return table_generations;
3800 }
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:292
void setGeneration(const uint32_t id, const TableGeneration &generation)

+ Here is the call graph for this function:

bool Executor::containsLeftDeepOuterJoin ( ) const
inline

Definition at line 416 of file Execute.h.

References cgen_state_.

416  {
417  return cgen_state_->contains_left_deep_outer_join_;
418  }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
void Executor::createErrorCheckControlFlow ( llvm::Function *  query_func,
bool  run_with_dynamic_watchdog,
bool  run_with_allowing_runtime_interrupt,
ExecutorDeviceType  device_type,
const std::vector< InputTableInfo > &  input_table_infos 
)
private

Definition at line 1833 of file NativeCodegen.cpp.

1838  {
1840 
1841  // check whether the row processing was successful; currently, it can
1842  // fail by running out of group by buffer slots
1843 
1844  if (run_with_dynamic_watchdog && run_with_allowing_runtime_interrupt) {
1845  // when both dynamic watchdog and runtime interrupt turns on
1846  // we use dynamic watchdog
1847  run_with_allowing_runtime_interrupt = false;
1848  }
1849 
1850  {
1851  // disable injecting query interrupt checker if the session info is invalid
1852  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
1853  if (current_query_session_.empty()) {
1854  run_with_allowing_runtime_interrupt = false;
1855  }
1856  }
1857 
1858  llvm::Value* row_count = nullptr;
1859  if ((run_with_dynamic_watchdog || run_with_allowing_runtime_interrupt) &&
1860  device_type == ExecutorDeviceType::GPU) {
1861  row_count =
1862  find_variable_in_basic_block<llvm::LoadInst>(query_func, ".entry", "row_count");
1863  }
1864 
1865  bool done_splitting = false;
1866  for (auto bb_it = query_func->begin(); bb_it != query_func->end() && !done_splitting;
1867  ++bb_it) {
1868  llvm::Value* pos = nullptr;
1869  for (auto inst_it = bb_it->begin(); inst_it != bb_it->end(); ++inst_it) {
1870  if ((run_with_dynamic_watchdog || run_with_allowing_runtime_interrupt) &&
1871  llvm::isa<llvm::PHINode>(*inst_it)) {
1872  if (inst_it->getName() == "pos") {
1873  pos = &*inst_it;
1874  }
1875  continue;
1876  }
1877  if (!llvm::isa<llvm::CallInst>(*inst_it)) {
1878  continue;
1879  }
1880  auto& row_func_call = llvm::cast<llvm::CallInst>(*inst_it);
1881  if (std::string(row_func_call.getCalledFunction()->getName()) == "row_process") {
1882  auto next_inst_it = inst_it;
1883  ++next_inst_it;
1884  auto new_bb = bb_it->splitBasicBlock(next_inst_it);
1885  auto& br_instr = bb_it->back();
1886  llvm::IRBuilder<> ir_builder(&br_instr);
1887  llvm::Value* err_lv = &*inst_it;
1888  llvm::Value* err_lv_returned_from_row_func = nullptr;
1889  if (run_with_dynamic_watchdog) {
1890  CHECK(pos);
1891  llvm::Value* call_watchdog_lv = nullptr;
1892  if (device_type == ExecutorDeviceType::GPU) {
1893  // In order to make sure all threads within a block see the same barrier,
1894  // only those blocks whose none of their threads have experienced the critical
1895  // edge will go through the dynamic watchdog computation
1896  CHECK(row_count);
1897  auto crit_edge_rem =
1898  (blockSize() & (blockSize() - 1))
1899  ? ir_builder.CreateSRem(
1900  row_count,
1901  cgen_state_->llInt(static_cast<int64_t>(blockSize())))
1902  : ir_builder.CreateAnd(
1903  row_count,
1904  cgen_state_->llInt(static_cast<int64_t>(blockSize() - 1)));
1905  auto crit_edge_threshold = ir_builder.CreateSub(row_count, crit_edge_rem);
1906  crit_edge_threshold->setName("crit_edge_threshold");
1907 
1908  // only those threads where pos < crit_edge_threshold go through dynamic
1909  // watchdog call
1910  call_watchdog_lv =
1911  ir_builder.CreateICmp(llvm::ICmpInst::ICMP_SLT, pos, crit_edge_threshold);
1912  } else {
1913  // CPU path: run watchdog for every 64th row
1914  auto dw_predicate = ir_builder.CreateAnd(pos, uint64_t(0x3f));
1915  call_watchdog_lv = ir_builder.CreateICmp(
1916  llvm::ICmpInst::ICMP_EQ, dw_predicate, cgen_state_->llInt(int64_t(0LL)));
1917  }
1918  CHECK(call_watchdog_lv);
1919  auto error_check_bb = bb_it->splitBasicBlock(
1920  llvm::BasicBlock::iterator(br_instr), ".error_check");
1921  auto& watchdog_br_instr = bb_it->back();
1922 
1923  auto watchdog_check_bb = llvm::BasicBlock::Create(
1924  cgen_state_->context_, ".watchdog_check", query_func, error_check_bb);
1925  llvm::IRBuilder<> watchdog_ir_builder(watchdog_check_bb);
1926  auto detected_timeout = watchdog_ir_builder.CreateCall(
1927  cgen_state_->module_->getFunction("dynamic_watchdog"), {});
1928  auto timeout_err_lv = watchdog_ir_builder.CreateSelect(
1929  detected_timeout, cgen_state_->llInt(Executor::ERR_OUT_OF_TIME), err_lv);
1930  watchdog_ir_builder.CreateBr(error_check_bb);
1931 
1932  llvm::ReplaceInstWithInst(
1933  &watchdog_br_instr,
1934  llvm::BranchInst::Create(
1935  watchdog_check_bb, error_check_bb, call_watchdog_lv));
1936  ir_builder.SetInsertPoint(&br_instr);
1937  auto unified_err_lv = ir_builder.CreatePHI(err_lv->getType(), 2);
1938 
1939  unified_err_lv->addIncoming(timeout_err_lv, watchdog_check_bb);
1940  unified_err_lv->addIncoming(err_lv, &*bb_it);
1941  err_lv = unified_err_lv;
1942  } else if (run_with_allowing_runtime_interrupt) {
1943  CHECK(pos);
1944  llvm::Value* call_check_interrupt_lv = nullptr;
1945  if (device_type == ExecutorDeviceType::GPU) {
1946  // approximate how many times the %pos variable
1947  // is increased --> the number of iteration
1948  // here we calculate the # bit shift by considering grid/block/fragment sizes
1949  // since if we use the fixed one (i.e., per 64-th increment)
1950  // some CUDA threads cannot enter the interrupt checking block depending on
1951  // the fragment size --> a thread may not take care of 64 threads if an outer
1952  // table is not sufficiently large, and so cannot be interrupted
1953  int32_t num_shift_by_gridDim = shared::getExpOfTwo(gridSize());
1954  int32_t num_shift_by_blockDim = shared::getExpOfTwo(blockSize());
1955  int total_num_shift = num_shift_by_gridDim + num_shift_by_blockDim;
1956  uint64_t interrupt_checking_freq = 32;
1957  auto freq_control_knob = g_running_query_interrupt_freq;
1958  CHECK_GT(freq_control_knob, 0);
1959  CHECK_LE(freq_control_knob, 1.0);
1960  if (!input_table_infos.empty()) {
1961  const auto& outer_table_info = *input_table_infos.begin();
1962  auto num_outer_table_tuples = outer_table_info.info.getNumTuples();
1963  if (outer_table_info.table_id < 0) {
1964  auto* rs = (*outer_table_info.info.fragments.begin()).resultSet;
1965  CHECK(rs);
1966  num_outer_table_tuples = rs->entryCount();
1967  } else {
1968  auto num_frags = outer_table_info.info.fragments.size();
1969  if (num_frags > 0) {
1970  num_outer_table_tuples =
1971  outer_table_info.info.fragments.begin()->getNumTuples();
1972  }
1973  }
1974  if (num_outer_table_tuples > 0) {
1975  // gridSize * blockSize --> pos_step (idx of the next row per thread)
1976  // we additionally multiply two to pos_step since the number of
1977  // dispatched blocks are double of the gridSize
1978  // # tuples (of fragment) / pos_step --> maximum # increment (K)
1979  // also we multiply 1 / freq_control_knob to K to control the frequency
1980  // So, needs to check the interrupt status more frequently? make K smaller
1981  auto max_inc = uint64_t(
1982  floor(num_outer_table_tuples / (gridSize() * blockSize() * 2)));
1983  if (max_inc < 2) {
1984  // too small `max_inc`, so this correction is necessary to make
1985  // `interrupt_checking_freq` be valid (i.e., larger than zero)
1986  max_inc = 2;
1987  }
1988  auto calibrated_inc = uint64_t(floor(max_inc * (1 - freq_control_knob)));
1989  interrupt_checking_freq =
1990  uint64_t(pow(2, shared::getExpOfTwo(calibrated_inc)));
1991  // add the coverage when interrupt_checking_freq > K
1992  // if so, some threads still cannot be branched to the interrupt checker
1993  // so we manually use smaller but close to the max_inc as freq
1994  if (interrupt_checking_freq > max_inc) {
1995  interrupt_checking_freq = max_inc / 2;
1996  }
1997  if (interrupt_checking_freq < 8) {
1998  // such small freq incurs too frequent interrupt status checking,
1999  // so we fixup to the minimum freq value at some reasonable degree
2000  interrupt_checking_freq = 8;
2001  }
2002  }
2003  }
2004  VLOG(1) << "Set the running query interrupt checking frequency: "
2005  << interrupt_checking_freq;
2006  // check the interrupt flag for every interrupt_checking_freq-th iteration
2007  llvm::Value* pos_shifted_per_iteration =
2008  ir_builder.CreateLShr(pos, cgen_state_->llInt(total_num_shift));
2009  auto interrupt_predicate =
2010  ir_builder.CreateAnd(pos_shifted_per_iteration, interrupt_checking_freq);
2011  call_check_interrupt_lv =
2012  ir_builder.CreateICmp(llvm::ICmpInst::ICMP_EQ,
2013  interrupt_predicate,
2014  cgen_state_->llInt(int64_t(0LL)));
2015  } else {
2016  // CPU path: run interrupt checker for every 64th row
2017  auto interrupt_predicate = ir_builder.CreateAnd(pos, uint64_t(0x3f));
2018  call_check_interrupt_lv =
2019  ir_builder.CreateICmp(llvm::ICmpInst::ICMP_EQ,
2020  interrupt_predicate,
2021  cgen_state_->llInt(int64_t(0LL)));
2022  }
2023  CHECK(call_check_interrupt_lv);
2024  auto error_check_bb = bb_it->splitBasicBlock(
2025  llvm::BasicBlock::iterator(br_instr), ".error_check");
2026  auto& check_interrupt_br_instr = bb_it->back();
2027 
2028  auto interrupt_check_bb = llvm::BasicBlock::Create(
2029  cgen_state_->context_, ".interrupt_check", query_func, error_check_bb);
2030  llvm::IRBuilder<> interrupt_checker_ir_builder(interrupt_check_bb);
2031  auto detected_interrupt = interrupt_checker_ir_builder.CreateCall(
2032  cgen_state_->module_->getFunction("check_interrupt"), {});
2033  auto interrupt_err_lv = interrupt_checker_ir_builder.CreateSelect(
2034  detected_interrupt, cgen_state_->llInt(Executor::ERR_INTERRUPTED), err_lv);
2035  interrupt_checker_ir_builder.CreateBr(error_check_bb);
2036 
2037  llvm::ReplaceInstWithInst(
2038  &check_interrupt_br_instr,
2039  llvm::BranchInst::Create(
2040  interrupt_check_bb, error_check_bb, call_check_interrupt_lv));
2041  ir_builder.SetInsertPoint(&br_instr);
2042  auto unified_err_lv = ir_builder.CreatePHI(err_lv->getType(), 2);
2043 
2044  unified_err_lv->addIncoming(interrupt_err_lv, interrupt_check_bb);
2045  unified_err_lv->addIncoming(err_lv, &*bb_it);
2046  err_lv = unified_err_lv;
2047  }
2048  if (!err_lv_returned_from_row_func) {
2049  err_lv_returned_from_row_func = err_lv;
2050  }
2051  if (device_type == ExecutorDeviceType::GPU && g_enable_dynamic_watchdog) {
2052  // let kernel execution finish as expected, regardless of the observed error,
2053  // unless it is from the dynamic watchdog where all threads within that block
2054  // return together.
2055  err_lv = ir_builder.CreateICmp(llvm::ICmpInst::ICMP_EQ,
2056  err_lv,
2058  } else {
2059  err_lv = ir_builder.CreateICmp(llvm::ICmpInst::ICMP_NE,
2060  err_lv,
2061  cgen_state_->llInt(static_cast<int32_t>(0)));
2062  }
2063  auto error_bb = llvm::BasicBlock::Create(
2064  cgen_state_->context_, ".error_exit", query_func, new_bb);
2065  const auto error_code_arg = get_arg_by_name(query_func, "error_code");
2066  llvm::CallInst::Create(
2067  cgen_state_->module_->getFunction("record_error_code"),
2068  std::vector<llvm::Value*>{err_lv_returned_from_row_func, error_code_arg},
2069  "",
2070  error_bb);
2071  llvm::ReturnInst::Create(cgen_state_->context_, error_bb);
2072  llvm::ReplaceInstWithInst(&br_instr,
2073  llvm::BranchInst::Create(error_bb, new_bb, err_lv));
2074  done_splitting = true;
2075  break;
2076  }
2077  }
2078  }
2079  CHECK(done_splitting);
2080 }
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1069
double g_running_query_interrupt_freq
Definition: Execute.cpp:118
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1117
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1010
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
#define CHECK_GT(x, y)
Definition: Logger.h:209
unsigned getExpOfTwo(unsigned n)
Definition: MathUtils.cpp:23
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:168
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1116
#define AUTOMATIC_IR_METADATA(CGENSTATE)
static QuerySessionId current_query_session_
Definition: Execute.h:1071
#define CHECK_LE(x, y)
Definition: Logger.h:208
unsigned gridSize() const
Definition: Execute.cpp:3322
#define CHECK(condition)
Definition: Logger.h:197
unsigned blockSize() const
Definition: Execute.cpp:3339
#define VLOG(n)
Definition: Logger.h:291
std::vector< std::unique_ptr< ExecutionKernel > > Executor::createKernels ( SharedKernelContext shared_context,
const RelAlgExecutionUnit ra_exe_unit,
ColumnFetcher column_fetcher,
const std::vector< InputTableInfo > &  table_infos,
const ExecutionOptions eo,
const bool  is_agg,
const bool  allow_single_frag_table_opt,
const size_t  context_count,
const QueryCompilationDescriptor query_comp_desc,
const QueryMemoryDescriptor query_mem_desc,
RenderInfo render_info,
std::unordered_set< int > &  available_gpus,
int &  available_cpus 
)
private

Determines execution dispatch mode and required fragments for a given query step, then creates kernels to execute the query and returns them for launch.

Definition at line 2044 of file Execute.cpp.

References ExecutionOptions::allow_multifrag, catalog_(), CHECK, CHECK_GE, CHECK_GT, anonymous_namespace{Execute.cpp}::checkWorkUnitWatchdog(), g_inner_join_fragment_skipping, QueryCompilationDescriptor::getDeviceType(), QueryMemoryDescriptor::getEntryCount(), SharedKernelContext::getFragOffsets(), QueryMemoryDescriptor::getQueryDescriptionType(), GPU, ExecutionOptions::gpu_input_mem_limit_percent, Data_Namespace::GPU_LEVEL, anonymous_namespace{Execute.cpp}::has_lazy_fetched_columns(), logger::INFO, RelAlgExecutionUnit::input_descs, KernelPerFragment, LOG, MultifragmentKernel, ExecutionOptions::outer_fragment_indices, Projection, query_mem_desc, RelAlgExecutionUnit::target_exprs, QueryMemoryDescriptor::toString(), RelAlgExecutionUnit::use_bump_allocator, VLOG, and ExecutionOptions::with_watchdog.

2057  {
2058  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2059 
2060  QueryFragmentDescriptor fragment_descriptor(
2061  ra_exe_unit,
2062  table_infos,
2063  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
2065  : std::vector<Data_Namespace::MemoryInfo>{},
2068  CHECK(!ra_exe_unit.input_descs.empty());
2069 
2070  const auto device_type = query_comp_desc.getDeviceType();
2071  const bool uses_lazy_fetch =
2072  plan_state_->allow_lazy_fetch_ &&
2074  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
2075  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
2076  const auto device_count = deviceCount(device_type);
2077  CHECK_GT(device_count, 0);
2078 
2079  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2080  shared_context.getFragOffsets(),
2081  device_count,
2082  device_type,
2083  use_multifrag_kernel,
2085  this);
2086  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2087  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
2088  }
2089 
2090  if (use_multifrag_kernel) {
2091  VLOG(1) << "Creating multifrag execution kernels";
2092  VLOG(1) << query_mem_desc.toString();
2093 
2094  // NB: We should never be on this path when the query is retried because of running
2095  // out of group by slots; also, for scan only queries on CPU we want the
2096  // high-granularity, fragment by fragment execution instead. For scan only queries on
2097  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
2098  // buffer per fragment.
2099  auto multifrag_kernel_dispatch = [&ra_exe_unit,
2100  &execution_kernels,
2101  &column_fetcher,
2102  &eo,
2103  &query_comp_desc,
2104  &query_mem_desc,
2105  render_info](const int device_id,
2106  const FragmentsList& frag_list,
2107  const int64_t rowid_lookup_key) {
2108  execution_kernels.emplace_back(
2109  std::make_unique<ExecutionKernel>(ra_exe_unit,
2111  device_id,
2112  eo,
2113  column_fetcher,
2114  query_comp_desc,
2115  query_mem_desc,
2116  frag_list,
2118  render_info,
2119  rowid_lookup_key));
2120  };
2121  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2122  } else {
2123  VLOG(1) << "Creating one execution kernel per fragment";
2124  VLOG(1) << query_mem_desc.toString();
2125 
2126  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2127  (query_mem_desc.getQueryDescriptionType() == QueryDescriptionType::Projection) &&
2128  table_infos.size() == 1 && table_infos.front().table_id > 0) {
2129  const auto max_frag_size =
2130  table_infos.front().info.getFragmentNumTuplesUpperBound();
2131  if (max_frag_size < query_mem_desc.getEntryCount()) {
2132  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2133  << " to match max fragment size " << max_frag_size
2134  << " for kernel per fragment execution path.";
2135  throw CompilationRetryNewScanLimit(max_frag_size);
2136  }
2137  }
2138 
2139  size_t frag_list_idx{0};
2140  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2141  &execution_kernels,
2142  &column_fetcher,
2143  &eo,
2144  &frag_list_idx,
2145  &device_type,
2146  &query_comp_desc,
2147  &query_mem_desc,
2148  render_info](const int device_id,
2149  const FragmentsList& frag_list,
2150  const int64_t rowid_lookup_key) {
2151  if (!frag_list.size()) {
2152  return;
2153  }
2154  CHECK_GE(device_id, 0);
2155 
2156  execution_kernels.emplace_back(
2157  std::make_unique<ExecutionKernel>(ra_exe_unit,
2158  device_type,
2159  device_id,
2160  eo,
2161  column_fetcher,
2162  query_comp_desc,
2163  query_mem_desc,
2164  frag_list,
2166  render_info,
2167  rowid_lookup_key));
2168  ++frag_list_idx;
2169  };
2170 
2171  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2172  ra_exe_unit);
2173  }
2174 
2175  return execution_kernels;
2176 }
bool is_agg(const Analyzer::Expr *expr)
std::vector< Analyzer::Expr * > target_exprs
ExecutorDeviceType getDeviceType() const
const std::vector< uint64_t > & getFragOffsets()
std::string toString() const
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:222
#define LOG(tag)
Definition: Logger.h:188
std::vector< size_t > outer_fragment_indices
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:334
std::vector< InputDescriptor > input_descs
#define CHECK_GE(x, y)
Definition: Logger.h:210
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:643
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::vector< FragmentsPerTable > FragmentsList
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:85
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1054
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1025
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const Catalog_Namespace::Catalog &cat, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1106
std::vector< MemoryInfo > getMemoryInfo(const MemoryLevel memLevel)
Definition: DataMgr.cpp:303
#define CHECK(condition)
Definition: Logger.h:197
double gpu_input_mem_limit_percent
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
Definition: Execute.cpp:2033
#define VLOG(n)
Definition: Logger.h:291

+ Here is the call graph for this function:

int Executor::deviceCount ( const ExecutorDeviceType  device_type) const
private

Definition at line 643 of file Execute.cpp.

References catalog_(), CHECK, and GPU.

643  {
644  if (device_type == ExecutorDeviceType::GPU) {
645  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
646  CHECK(cuda_mgr);
647  return cuda_mgr->getDeviceCount();
648  } else {
649  return 1;
650  }
651 }
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:207
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:222
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1054
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

int Executor::deviceCountForMemoryLevel ( const Data_Namespace::MemoryLevel  memory_level) const
private

Definition at line 653 of file Execute.cpp.

References CPU, GPU, and Data_Namespace::GPU_LEVEL.

654  {
655  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
656  : deviceCount(ExecutorDeviceType::CPU);
657 }
ExecutorDeviceType
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:643
int64_t Executor::deviceCycles ( int  milliseconds) const
private

Definition at line 3353 of file Execute.cpp.

References catalog_(), and CHECK.

3353  {
3354  CHECK(catalog_);
3355  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3356  CHECK(cuda_mgr);
3357  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3358  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
3359 }
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:207
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:222
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1054
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

void Executor::enableRuntimeQueryInterrupt ( const double  runtime_query_check_freq,
const unsigned  pending_query_check_freq 
) const

Definition at line 4150 of file Execute.cpp.

References g_enable_runtime_query_interrupt, g_pending_query_interrupt_freq, and g_running_query_interrupt_freq.

4152  {
4153  // The only one scenario that we intentionally call this function is
4154  // to allow runtime query interrupt in QueryRunner for test cases.
4155  // Because test machine's default setting does not allow runtime query interrupt,
4156  // so we have to turn it on within test code if necessary.
4158  g_pending_query_interrupt_freq = pending_query_check_freq;
4159  g_running_query_interrupt_freq = runtime_query_check_freq;
4162  }
4163 }
double g_running_query_interrupt_freq
Definition: Execute.cpp:118
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:117
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:114
void Executor::enrollQuerySession ( const QuerySessionId query_session,
const std::string &  query_str,
const std::string &  submitted_time_str,
const size_t  executor_id,
const QuerySessionStatus::QueryStatus  query_session_status 
)

Definition at line 3957 of file Execute.cpp.

References executor_id_().

3962  {
3963  // enroll the query session into the Executor's session map
3964  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3965  if (query_session.empty()) {
3966  return;
3967  }
3968 
3969  addToQuerySessionList(query_session,
3970  query_str,
3971  submitted_time_str,
3972  executor_id,
3973  query_session_status,
3974  session_write_lock);
3975 
3976  if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING) {
3977  current_query_session_ = query_session;
3979  }
3980 }
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1069
const ExecutorId executor_id_
Definition: Execute.h:1053
bool addToQuerySessionList(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3982
static QuerySessionId current_query_session_
Definition: Execute.h:1071
static size_t running_query_executor_id_
Definition: Execute.h:1073

+ Here is the call graph for this function:

ResultSetPtr Executor::executeExplain ( const QueryCompilationDescriptor query_comp_desc)
private

Definition at line 1716 of file Execute.cpp.

References QueryCompilationDescriptor::getIR().

1716  {
1717  return std::make_shared<ResultSet>(query_comp_desc.getIR());
1718 }

+ Here is the call graph for this function:

int32_t Executor::executePlanWithGroupBy ( const RelAlgExecutionUnit ra_exe_unit,
const CompilationResult compilation_result,
const bool  hoist_literals,
ResultSetPtr results,
const ExecutorDeviceType  device_type,
std::vector< std::vector< const int8_t * >> &  col_buffers,
const std::vector< size_t >  outer_tab_frag_ids,
QueryExecutionContext query_exe_context,
const std::vector< std::vector< int64_t >> &  num_rows,
const std::vector< std::vector< uint64_t >> &  frag_offsets,
Data_Namespace::DataMgr data_mgr,
const int  device_id,
const int  outer_table_id,
const int64_t  limit,
const uint32_t  start_rowid,
const uint32_t  num_tables,
const bool  allow_runtime_interrupt,
RenderInfo render_info 
)
private

Definition at line 3052 of file Execute.cpp.

References CHECK, CHECK_NE, anonymous_namespace{Execute.cpp}::check_rows_less_than_needed(), CPU, DEBUG_TIMER, ERR_DIV_BY_ZERO, ERR_GEOS, ERR_INTERRUPTED, ERR_OUT_OF_TIME, ERR_OVERFLOW_OR_UNDERFLOW, ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES, logger::FATAL, g_enable_dynamic_watchdog, CompilationResult::generated_code, QueryMemoryDescriptor::getEntryCount(), QueryExecutionContext::getRowSet(), GpuSharedMemoryContext::getSharedMemorySize(), GPU, CompilationResult::gpu_smem_context, RelAlgExecutionUnit::groupby_exprs, INJECT_TIMER, RelAlgExecutionUnit::input_col_descs, RelAlgExecutionUnit::input_descs, QueryExecutionContext::launchCpuCode(), QueryExecutionContext::launchGpuCode(), CompilationResult::literal_values, LOG, shared::printContainer(), QueryExecutionContext::query_buffers_, QueryExecutionContext::query_mem_desc_, RenderInfo::render_allocator_map_ptr, RelAlgExecutionUnit::scan_limit, QueryMemoryDescriptor::setEntryCount(), RelAlgExecutionUnit::union_all, RenderInfo::useCudaBuffers(), and VLOG.

3070  {
3071  auto timer = DEBUG_TIMER(__func__);
3073  CHECK(!results);
3074  if (col_buffers.empty()) {
3075  return 0;
3076  }
3077  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
3078  // TODO(alex):
3079  // 1. Optimize size (make keys more compact).
3080  // 2. Resize on overflow.
3081  // 3. Optimize runtime.
3082  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3083  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
3084  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3085  if (allow_runtime_interrupt) {
3086  bool isInterrupted = false;
3087  {
3088  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
3089  const auto query_session = getCurrentQuerySession(session_read_lock);
3090  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3091  }
3092  if (isInterrupted) {
3094  }
3095  }
3096  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3097  return ERR_INTERRUPTED;
3098  }
3099 
3100  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3101  if (render_info && render_info->useCudaBuffers()) {
3102  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3103  }
3104 
3105  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
3106  << " ra_exe_unit.input_descs="
3107  << shared::printContainer(ra_exe_unit.input_descs)
3108  << " ra_exe_unit.input_col_descs="
3109  << shared::printContainer(ra_exe_unit.input_col_descs)
3110  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
3111  << " num_rows=" << shared::printContainer(num_rows)
3112  << " frag_offsets=" << shared::printContainer(frag_offsets)
3113  << " query_exe_context->query_buffers_->num_rows_="
3114  << query_exe_context->query_buffers_->num_rows_
3115  << " query_exe_context->query_mem_desc_.getEntryCount()="
3116  << query_exe_context->query_mem_desc_.getEntryCount()
3117  << " device_id=" << device_id << " outer_table_id=" << outer_table_id
3118  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
3119  << " num_tables=" << num_tables;
3120 
3121  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
3122  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
3123  // with outer_table_id.
3124  if (ra_exe_unit_copy.union_all) {
3125  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
3126  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
3127  ra_exe_unit_copy.input_descs.end(),
3128  [outer_table_id](auto const& a, auto const& b) {
3129  return a.getTableId() == outer_table_id &&
3130  b.getTableId() != outer_table_id;
3131  });
3132  while (!ra_exe_unit_copy.input_descs.empty() &&
3133  ra_exe_unit_copy.input_descs.back().getTableId() != outer_table_id) {
3134  ra_exe_unit_copy.input_descs.pop_back();
3135  }
3136  // Filter ra_exe_unit_copy.input_col_descs.
3137  ra_exe_unit_copy.input_col_descs.remove_if(
3138  [outer_table_id](auto const& input_col_desc) {
3139  return input_col_desc->getScanDesc().getTableId() != outer_table_id;
3140  });
3141  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
3142  }
3143 
3144  if (device_type == ExecutorDeviceType::CPU) {
3145  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
3146  compilation_result.generated_code);
3147  CHECK(cpu_generated_code);
3148  query_exe_context->launchCpuCode(
3149  ra_exe_unit_copy,
3150  cpu_generated_code.get(),
3151  hoist_literals,
3152  hoist_buf,
3153  col_buffers,
3154  num_rows,
3155  frag_offsets,
3156  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
3157  &error_code,
3158  num_tables,
3159  join_hash_table_ptrs);
3160  } else {
3161  try {
3162  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
3163  compilation_result.generated_code);
3164  CHECK(gpu_generated_code);
3165  query_exe_context->launchGpuCode(
3166  ra_exe_unit_copy,
3167  gpu_generated_code.get(),
3168  hoist_literals,
3169  hoist_buf,
3170  col_buffers,
3171  num_rows,
3172  frag_offsets,
3173  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
3174  data_mgr,
3175  blockSize(),
3176  gridSize(),
3177  device_id,
3178  compilation_result.gpu_smem_context.getSharedMemorySize(),
3179  &error_code,
3180  num_tables,
3181  allow_runtime_interrupt,
3182  join_hash_table_ptrs,
3183  render_allocator_map_ptr);
3184  } catch (const OutOfMemory&) {
3185  return ERR_OUT_OF_GPU_MEM;
3186  } catch (const OutOfRenderMemory&) {
3187  return ERR_OUT_OF_RENDER_MEM;
3188  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
3190  } catch (const std::exception& e) {
3191  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3192  }
3193  }
3194 
3195  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3196  error_code == Executor::ERR_DIV_BY_ZERO ||
3197  error_code == Executor::ERR_OUT_OF_TIME ||
3198  error_code == Executor::ERR_INTERRUPTED ||
3200  error_code == Executor::ERR_GEOS) {
3201  return error_code;
3202  }
3203 
3204  if (error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
3205  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
3206  results = query_exe_context->getRowSet(ra_exe_unit_copy,
3207  query_exe_context->query_mem_desc_);
3208  CHECK(results);
3209  VLOG(2) << "results->rowCount()=" << results->rowCount();
3210  results->holdLiterals(hoist_buf);
3211  }
3212  if (error_code < 0 && render_allocator_map_ptr) {
3213  auto const adjusted_scan_limit =
3214  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3215  // More rows passed the filter than available slots. We don't have a count to check,
3216  // so assume we met the limit if a scan limit is set
3217  if (adjusted_scan_limit != 0) {
3218  return 0;
3219  } else {
3220  return error_code;
3221  }
3222  }
3223  if (error_code && (!scan_limit || check_rows_less_than_needed(results, scan_limit))) {
3224  return error_code; // unlucky, not enough results and we ran out of slots
3225  }
3226 
3227  return 0;
3228 }
QuerySessionId & getCurrentQuerySession(mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3817
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1069
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info)
Definition: Execute.cpp:3052
bool useCudaBuffers() const
Definition: RenderInfo.cpp:69
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1117
void setEntryCount(const size_t val)
GpuSharedMemoryContext gpu_smem_context
const std::optional< bool > union_all
#define LOG(tag)
Definition: Logger.h:188
size_t getSharedMemorySize() const
std::vector< InputDescriptor > input_descs
static const int32_t ERR_GEOS
Definition: Execute.h:1123
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool checkIsQuerySessionInterrupted(const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4125
std::unique_ptr< QueryMemoryInitializer > query_buffers_
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3230
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1121
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1109
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
#define INJECT_TIMER(DESC)
Definition: measure.h:93
#define CHECK_NE(x, y)
Definition: Logger.h:206
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1113
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1115
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1116
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1122
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1110
std::shared_ptr< CompilationContext > generated_code
QueryMemoryDescriptor query_mem_desc_
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:381
unsigned gridSize() const
Definition: Execute.cpp:3322
std::unordered_map< int, CgenState::LiteralValues > literal_values
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
Definition: Execute.cpp:3045
static std::atomic< bool > interrupted_
Definition: Execute.h:1034
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const GpuCompilationContext *cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const bool allow_runtime_interrupt, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
unsigned blockSize() const
Definition: Execute.cpp:3339
#define VLOG(n)
Definition: Logger.h:291

+ Here is the call graph for this function:

int32_t Executor::executePlanWithoutGroupBy ( const RelAlgExecutionUnit ra_exe_unit,
const CompilationResult compilation_result,
const bool  hoist_literals,
ResultSetPtr results,
const std::vector< Analyzer::Expr * > &  target_exprs,
const ExecutorDeviceType  device_type,
std::vector< std::vector< const int8_t * >> &  col_buffers,
QueryExecutionContext query_exe_context,
const std::vector< std::vector< int64_t >> &  num_rows,
const std::vector< std::vector< uint64_t >> &  frag_offsets,
Data_Namespace::DataMgr data_mgr,
const int  device_id,
const uint32_t  start_rowid,
const uint32_t  num_tables,
const bool  allow_runtime_interrupt,
RenderInfo render_info 
)
private

Definition at line 2840 of file Execute.cpp.

References CHECK, CHECK_EQ, CPU, DEBUG_TIMER, ERR_DIV_BY_ZERO, ERR_GEOS, ERR_INTERRUPTED, ERR_OUT_OF_TIME, ERR_OVERFLOW_OR_UNDERFLOW, ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES, RelAlgExecutionUnit::estimator, QueryExecutionContext::estimator_result_set_, logger::FATAL, g_bigint_count, g_enable_dynamic_watchdog, CompilationResult::generated_code, get_target_info(), QueryExecutionContext::getAggInitValForIndex(), QueryMemoryDescriptor::getPaddedSlotWidthBytes(), GpuSharedMemoryContext::getSharedMemorySize(), GPU, CompilationResult::gpu_smem_context, i, INJECT_TIMER, is_distinct_target(), RenderInfo::isPotentialInSituRender(), GpuSharedMemoryContext::isSharedMemoryUsed(), kAPPROX_COUNT_DISTINCT, kAPPROX_MEDIAN, kAVG, kCOUNT, kSAMPLE, QueryExecutionContext::launchCpuCode(), QueryExecutionContext::launchGpuCode(), CompilationResult::literal_values, LOG, QueryExecutionContext::query_buffers_, QueryExecutionContext::query_mem_desc_, reduceResults(), RenderInfo::render_allocator_map_ptr, takes_float_argument(), and RenderInfo::useCudaBuffers().

2856  {
2858  auto timer = DEBUG_TIMER(__func__);
2859  CHECK(!results);
2860  if (col_buffers.empty()) {
2861  return 0;
2862  }
2863 
2864  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2865  if (render_info) {
2866  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
2867  // here, we are in non-insitu mode.
2868  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
2869  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
2870  "currently unsupported.";
2871  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2872  }
2873 
2874  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2875  std::vector<int64_t*> out_vec;
2876  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2877  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2878  std::unique_ptr<OutVecOwner> output_memory_scope;
2879  if (allow_runtime_interrupt) {
2880  bool isInterrupted = false;
2881  {
2882  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
2883  const auto query_session = getCurrentQuerySession(session_read_lock);
2884  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
2885  }
2886  if (isInterrupted) {
2888  }
2889  }
2890  if (g_enable_dynamic_watchdog && interrupted_.load()) {
2892  }
2893  if (device_type == ExecutorDeviceType::CPU) {
2894  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
2895  compilation_result.generated_code);
2896  CHECK(cpu_generated_code);
2897  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
2898  cpu_generated_code.get(),
2899  hoist_literals,
2900  hoist_buf,
2901  col_buffers,
2902  num_rows,
2903  frag_offsets,
2904  0,
2905  &error_code,
2906  num_tables,
2907  join_hash_table_ptrs);
2908  output_memory_scope.reset(new OutVecOwner(out_vec));
2909  } else {
2910  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
2911  compilation_result.generated_code);
2912  CHECK(gpu_generated_code);
2913  try {
2914  out_vec = query_exe_context->launchGpuCode(
2915  ra_exe_unit,
2916  gpu_generated_code.get(),
2917  hoist_literals,
2918  hoist_buf,
2919  col_buffers,
2920  num_rows,
2921  frag_offsets,
2922  0,
2923  data_mgr,
2924  blockSize(),
2925  gridSize(),
2926  device_id,
2927  compilation_result.gpu_smem_context.getSharedMemorySize(),
2928  &error_code,
2929  num_tables,
2930  allow_runtime_interrupt,
2931  join_hash_table_ptrs,
2932  render_allocator_map_ptr);
2933  output_memory_scope.reset(new OutVecOwner(out_vec));
2934  } catch (const OutOfMemory&) {
2935  return ERR_OUT_OF_GPU_MEM;
2936  } catch (const std::exception& e) {
2937  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2938  }
2939  }
2940  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2941  error_code == Executor::ERR_DIV_BY_ZERO ||
2942  error_code == Executor::ERR_OUT_OF_TIME ||
2943  error_code == Executor::ERR_INTERRUPTED ||
2945  error_code == Executor::ERR_GEOS) {
2946  return error_code;
2947  }
2948  if (ra_exe_unit.estimator) {
2949  CHECK(!error_code);
2950  results =
2951  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
2952  return 0;
2953  }
2954  std::vector<int64_t> reduced_outs;
2955  const auto num_frags = col_buffers.size();
2956  const size_t entry_count =
2957  device_type == ExecutorDeviceType::GPU
2958  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
2959  ? 1
2960  : blockSize() * gridSize() * num_frags)
2961  : num_frags;
2962  if (size_t(1) == entry_count) {
2963  for (auto out : out_vec) {
2964  CHECK(out);
2965  reduced_outs.push_back(*out);
2966  }
2967  } else {
2968  size_t out_vec_idx = 0;
2969 
2970  for (const auto target_expr : target_exprs) {
2971  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2972  CHECK(agg_info.is_agg);
2973 
2974  const int num_iterations = agg_info.sql_type.is_geometry()
2975  ? agg_info.sql_type.get_physical_coord_cols()
2976  : 1;
2977 
2978  for (int i = 0; i < num_iterations; i++) {
2979  int64_t val1;
2980  const bool float_argument_input = takes_float_argument(agg_info);
2981  if (is_distinct_target(agg_info) || agg_info.agg_kind == kAPPROX_MEDIAN) {
2982  CHECK(agg_info.agg_kind == kCOUNT ||
2983  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT ||
2984  agg_info.agg_kind == kAPPROX_MEDIAN);
2985  val1 = out_vec[out_vec_idx][0];
2986  error_code = 0;
2987  } else {
2988  const auto chosen_bytes = static_cast<size_t>(
2989  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
2990  std::tie(val1, error_code) = Executor::reduceResults(
2991  agg_info.agg_kind,
2992  agg_info.sql_type,
2993  query_exe_context->getAggInitValForIndex(out_vec_idx),
2994  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2995  out_vec[out_vec_idx],
2996  entry_count,
2997  false,
2998  float_argument_input);
2999  }
3000  if (error_code) {
3001  break;
3002  }
3003  reduced_outs.push_back(val1);
3004  if (agg_info.agg_kind == kAVG ||
3005  (agg_info.agg_kind == kSAMPLE &&
3006  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
3007  const auto chosen_bytes = static_cast<size_t>(
3008  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
3009  1));
3010  int64_t val2;
3011  std::tie(val2, error_code) = Executor::reduceResults(
3012  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
3013  agg_info.sql_type,
3014  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
3015  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3016  out_vec[out_vec_idx + 1],
3017  entry_count,
3018  false,
3019  false);
3020  if (error_code) {
3021  break;
3022  }
3023  reduced_outs.push_back(val2);
3024  ++out_vec_idx;
3025  }
3026  ++out_vec_idx;
3027  }
3028  }
3029  }
3030 
3031  if (error_code) {
3032  return error_code;
3033  }
3034 
3035  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
3036  auto rows_ptr = std::shared_ptr<ResultSet>(
3037  query_exe_context->query_buffers_->result_sets_[0].release());
3038  rows_ptr->fillOneEntry(reduced_outs);
3039  results = std::move(rows_ptr);
3040  return error_code;
3041 }
QuerySessionId & getCurrentQuerySession(mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3817
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1069
#define CHECK_EQ(x, y)
Definition: Logger.h:205
bool useCudaBuffers() const
Definition: RenderInfo.cpp:69
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1117
GpuSharedMemoryContext gpu_smem_context
#define LOG(tag)
Definition: Logger.h:188
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:79
size_t getSharedMemorySize() const
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:660
static const int32_t ERR_GEOS
Definition: Execute.h:1123
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:134
bool checkIsQuerySessionInterrupted(const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4125
std::unique_ptr< QueryMemoryInitializer > query_buffers_
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3230
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1109
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1115
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1116
bool g_bigint_count
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:130
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
int64_t getAggInitValForIndex(const size_t index) const
Definition: