OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Executor Class Reference

#include <Execute.h>

+ Collaboration diagram for Executor:

Classes

struct  ExecutorMutexHolder
 
class  FetchCacheAnchor
 
struct  GroupColLLVMValue
 
struct  JoinHashTableOrError
 

Public Types

using ExecutorId = size_t
 
using CachedCardinality = std::pair< bool, size_t >
 

Public Member Functions

 Executor (const ExecutorId id, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
 
const TemporaryTablesgetTemporaryTables ()
 
StringDictionaryProxygetStringDictionaryProxy (const int dict_id, const bool with_generation) const
 
StringDictionaryProxygetStringDictionaryProxy (const int dictId, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
 
bool isCPUOnly () const
 
bool isArchMaxwell (const ExecutorDeviceType dt) const
 
bool containsLeftDeepOuterJoin () const
 
const ColumnDescriptorgetColumnDescriptor (const Analyzer::ColumnVar *) const
 
const ColumnDescriptorgetPhysicalColumnDescriptor (const Analyzer::ColumnVar *, int) const
 
const Catalog_Namespace::CataloggetCatalog () const
 
void setCatalog (const Catalog_Namespace::Catalog *catalog)
 
const std::shared_ptr
< RowSetMemoryOwner
getRowSetMemoryOwner () const
 
const TemporaryTablesgetTemporaryTables () const
 
Fragmenter_Namespace::TableInfo getTableInfo (const int table_id) const
 
const TableGenerationgetTableGeneration (const int table_id) const
 
ExpressionRange getColRange (const PhysicalInput &) const
 
size_t getNumBytesForFetchedRow (const std::set< int > &table_ids_to_fetch) const
 
std::vector< ColumnLazyFetchInfogetColLazyFetchInfo (const std::vector< Analyzer::Expr * > &target_exprs) const
 
void registerActiveModule (void *module, const int device_id) const
 
void unregisterActiveModule (void *module, const int device_id) const
 
void interrupt (const QuerySessionId &query_session="", const QuerySessionId &interrupt_session="")
 
void resetInterrupt ()
 
void enableRuntimeQueryInterrupt (const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
 
int8_t warpSize () const
 
unsigned gridSize () const
 
unsigned numBlocksPerMP () const
 
unsigned blockSize () const
 
size_t maxGpuSlabSize () const
 
ResultSetPtr executeWorkUnit (size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
 
TableUpdateMetadata executeUpdate (const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const UpdateLogForFragment::Callback &cb, const bool is_agg)
 
void setupCaching (const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< int > &phys_table_ids)
 
void setColRangeCache (const AggregatedColRange &aggregated_col_range)
 
QuerySessionIdgetCurrentQuerySession (mapd_shared_lock< mapd_shared_mutex > &read_lock)
 
size_t getRunningExecutorId (mapd_shared_lock< mapd_shared_mutex > &read_lock)
 
void setCurrentQuerySession (const QuerySessionId &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
void setRunningExecutorId (const size_t id, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
bool checkCurrentQuerySession (const std::string &candidate_query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
 
void invalidateRunningQuerySession (mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
bool addToQuerySessionList (const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
bool removeFromQuerySessionList (const QuerySessionId &query_session, const std::string &submitted_time_str, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
void setQuerySessionAsInterrupted (const QuerySessionId &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
void resetQuerySessionInterruptFlag (const std::string &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
bool checkIsQuerySessionInterrupted (const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
 
bool checkIsRunningQuerySessionInterrupted ()
 
bool checkIsQuerySessionEnrolled (const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
 
bool updateQuerySessionStatusWithLock (const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
bool updateQuerySessionExecutorAssignment (const QuerySessionId &query_session, const std::string &submitted_time_str, const size_t executor_id, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
std::vector< QuerySessionStatusgetQuerySessionInfo (const QuerySessionId &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
 
mapd_shared_mutexgetSessionLock ()
 
CurrentQueryStatus attachExecutorToQuerySession (const QuerySessionId &query_session_id, const std::string &query_str, const std::string &query_submitted_time)
 
void checkPendingQueryStatus (const QuerySessionId &query_session)
 
void clearQuerySessionStatus (const QuerySessionId &query_session, const std::string &submitted_time_str, const bool acquire_spin_lock)
 
void updateQuerySessionStatus (std::shared_ptr< const query_state::QueryState > &query_state, const QuerySessionStatus::QueryStatus new_query_status)
 
void updateQuerySessionStatus (const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus new_query_status)
 
void enrollQuerySession (const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted_time_str, const size_t executor_id, const QuerySessionStatus::QueryStatus query_session_status)
 
void addToCardinalityCache (const std::string &cache_key, const size_t cache_value)
 
CachedCardinality getCachedCardinality (const std::string &cache_key)
 
template<typename THREAD_POOL >
void launchKernels (SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels)
 

Static Public Member Functions

static std::shared_ptr< ExecutorgetExecutor (const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
 
static void nukeCacheOfExecutors ()
 
static void clearMemory (const Data_Namespace::MemoryLevel memory_level)
 
static size_t getArenaBlockSize ()
 
static std::pair< int64_t,
int32_t > 
reduceResults (const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
 
static void addCodeToCache (const CodeCacheKey &, std::shared_ptr< CompilationContext >, llvm::Module *, CodeCache &)
 

Static Public Attributes

static const ExecutorId UNITARY_EXECUTOR_ID = 0
 
static const size_t high_scan_limit
 
static const int32_t ERR_DIV_BY_ZERO {1}
 
static const int32_t ERR_OUT_OF_GPU_MEM {2}
 
static const int32_t ERR_OUT_OF_SLOTS {3}
 
static const int32_t ERR_UNSUPPORTED_SELF_JOIN {4}
 
static const int32_t ERR_OUT_OF_RENDER_MEM {5}
 
static const int32_t ERR_OUT_OF_CPU_MEM {6}
 
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW {7}
 
static const int32_t ERR_OUT_OF_TIME {9}
 
static const int32_t ERR_INTERRUPTED {10}
 
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED {11}
 
static const int32_t ERR_TOO_MANY_LITERALS {12}
 
static const int32_t ERR_STRING_CONST_IN_RESULTSET {13}
 
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY {14}
 
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES {15}
 
static const int32_t ERR_GEOS {16}
 
static std::mutex compilation_mutex_
 
static std::mutex kernel_mutex_
 

Private Types

using PerFragmentCallBack = std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)>
 

Private Member Functions

void clearMetaInfoCache ()
 
int deviceCount (const ExecutorDeviceType) const
 
int deviceCountForMemoryLevel (const Data_Namespace::MemoryLevel memory_level) const
 
llvm::Value * codegenWindowFunction (const size_t target_index, const CompilationOptions &co)
 
llvm::Value * codegenWindowFunctionAggregate (const CompilationOptions &co)
 
llvm::BasicBlock * codegenWindowResetStateControlFlow ()
 
void codegenWindowFunctionStateInit (llvm::Value *aggregate_state)
 
llvm::Value * codegenWindowFunctionAggregateCalls (llvm::Value *aggregate_state, const CompilationOptions &co)
 
void codegenWindowAvgEpilogue (llvm::Value *crt_val, llvm::Value *window_func_null_val, llvm::Value *multiplicity_lv)
 
llvm::Value * codegenAggregateWindowState ()
 
llvm::Value * aggregateWindowStatePtr ()
 
bool isArchPascalOrLater (const ExecutorDeviceType dt) const
 
bool needFetchAllFragments (const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
 
bool needLinearizeAllFragments (const ColumnDescriptor *cd, const InputColDescriptor &inner_col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments, const Data_Namespace::MemoryLevel memory_level) const
 
void executeWorkUnitPerFragment (const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
 Compiles and dispatches a work unit per fragment processing results with the per fragment callback. Currently used for computing metrics over fragments (metadata). More...
 
ResultSetPtr executeExplain (const QueryCompilationDescriptor &)
 
ResultSetPtr executeTableFunction (const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
 Compiles and dispatches a table function; that is, a function that takes as input one or more columns and returns a ResultSet, which can be parsed by subsequent execution steps. More...
 
ExecutorDeviceType getDeviceTypeForTargets (const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
 
ResultSetPtr collectAllDeviceResults (SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
 
ResultSetPtr collectAllDeviceShardedTopResults (SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit) const
 
std::unordered_map< int, const
Analyzer::BinOper * > 
getInnerTabIdToJoinCond () const
 
std::vector< std::unique_ptr
< ExecutionKernel > > 
createKernels (SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
 
template<typename THREAD_POOL >
void launchKernels (SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels)
 
std::vector< size_t > getTableFragmentIndices (const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments * > &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
 
bool skipFragmentPair (const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
 
FetchResult fetchChunks (const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
 
FetchResult fetchUnionChunks (const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator, const size_t thread_idx, const bool allow_runtime_interrupt)
 
std::pair< std::vector
< std::vector< int64_t >
>, std::vector< std::vector
< uint64_t > > > 
getRowCountAndOffsetForAllFrags (const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
 
void buildSelectedFragsMapping (std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
 
void buildSelectedFragsMappingForUnion (std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
 
std::vector< size_t > getFragmentCount (const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
 
int32_t executePlanWithGroupBy (const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info)
 
int32_t executePlanWithoutGroupBy (const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info)
 
ResultSetPtr resultsUnion (SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit)
 
std::vector< int64_t > getJoinHashTablePtrs (const ExecutorDeviceType device_type, const int device_id)
 
ResultSetPtr reduceMultiDeviceResults (const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
 
ResultSetPtr reduceMultiDeviceResultSets (std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
 
ResultSetPtr reduceSpeculativeTopN (const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
 
ResultSetPtr executeWorkUnitImpl (size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
 
std::vector< llvm::Value * > inlineHoistedLiterals ()
 
std::tuple< CompilationResult,
std::unique_ptr
< QueryMemoryDescriptor > > 
compileWorkUnit (const std::vector< InputTableInfo > &query_infos, const PlanState::DeletedColumnsMap &deleted_cols_map, const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, const CudaMgr_Namespace::CudaMgr *cuda_mgr, const bool allow_lazy_fetch, std::shared_ptr< RowSetMemoryOwner >, const size_t max_groups_buffer_entry_count, const int8_t crt_min_byte_width, const bool has_cardinality_estimation, ColumnCacheMap &column_cache, RenderInfo *render_info=nullptr)
 
llvm::BasicBlock * codegenSkipDeletedOuterTableRow (const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
 
std::vector< JoinLoopbuildJoinLoops (RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, const std::vector< InputTableInfo > &query_infos, ColumnCacheMap &column_cache)
 
JoinLoop::HoistedFiltersCallback buildHoistLeftHandSideFiltersCb (const RelAlgExecutionUnit &ra_exe_unit, const size_t level_idx, const int inner_table_id, const CompilationOptions &co)
 
std::function< llvm::Value
*(const std::vector
< llvm::Value * >
&, llvm::Value *)> 
buildIsDeletedCb (const RelAlgExecutionUnit &ra_exe_unit, const size_t level_idx, const CompilationOptions &co)
 
std::shared_ptr< HashJoinbuildCurrentLevelHashTable (const JoinCondition &current_level_join_conditions, RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const std::vector< InputTableInfo > &query_infos, ColumnCacheMap &column_cache, std::vector< std::string > &fail_reasons)
 
void redeclareFilterFunction ()
 
llvm::Value * addJoinLoopIterator (const std::vector< llvm::Value * > &prev_iters, const size_t level_idx)
 
void codegenJoinLoops (const std::vector< JoinLoop > &join_loops, const RelAlgExecutionUnit &ra_exe_unit, GroupByAndAggregate &group_by_and_aggregate, llvm::Function *query_func, llvm::BasicBlock *entry_bb, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const ExecutionOptions &eo)
 
bool compileBody (const RelAlgExecutionUnit &ra_exe_unit, GroupByAndAggregate &group_by_and_aggregate, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const GpuSharedMemoryContext &gpu_smem_context={})
 
void createErrorCheckControlFlow (llvm::Function *query_func, bool run_with_dynamic_watchdog, bool run_with_allowing_runtime_interrupt, ExecutorDeviceType device_type, const std::vector< InputTableInfo > &input_table_infos)
 
void insertErrorCodeChecker (llvm::Function *query_func, bool hoist_literals, bool allow_runtime_query_interrupt)
 
void preloadFragOffsets (const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
 
JoinHashTableOrError buildHashTableForQualifier (const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const HashType preferred_hash_type, ColumnCacheMap &column_cache, const RegisteredQueryHint &query_hint)
 
void nukeOldState (const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const PlanState::DeletedColumnsMap &deleted_cols_map, const RelAlgExecutionUnit *ra_exe_unit)
 
std::shared_ptr
< CompilationContext
optimizeAndCodegenCPU (llvm::Function *, llvm::Function *, const std::unordered_set< llvm::Function * > &, const CompilationOptions &)
 
std::shared_ptr
< CompilationContext
optimizeAndCodegenGPU (llvm::Function *, llvm::Function *, std::unordered_set< llvm::Function * > &, const bool no_inline, const CudaMgr_Namespace::CudaMgr *cuda_mgr, const CompilationOptions &)
 
std::string generatePTX (const std::string &) const
 
void initializeNVPTXBackend () const
 
int64_t deviceCycles (int milliseconds) const
 
GroupColLLVMValue groupByColumnCodegen (Analyzer::Expr *group_by_col, const size_t col_width, const CompilationOptions &, const bool translate_null_val, const int64_t translated_null_val, DiamondCodegen &, std::stack< llvm::BasicBlock * > &, const bool thread_mem_shared)
 
llvm::Value * castToFP (llvm::Value *, SQLTypeInfo const &from_ti, SQLTypeInfo const &to_ti)
 
llvm::Value * castToIntPtrTyIn (llvm::Value *val, const size_t bit_width)
 
std::tuple
< RelAlgExecutionUnit,
PlanState::DeletedColumnsMap
addDeletedColumn (const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
 
bool isFragmentFullyDeleted (const int table_id, const Fragmenter_Namespace::FragmentInfo &fragment)
 
std::pair< bool, int64_t > skipFragment (const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
 
std::pair< bool, int64_t > skipFragmentInnerJoins (const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
 
AggregatedColRange computeColRangesCache (const std::unordered_set< PhysicalInput > &phys_inputs)
 
StringDictionaryGenerations computeStringDictionaryGenerations (const std::unordered_set< PhysicalInput > &phys_inputs)
 
TableGenerations computeTableGenerations (std::unordered_set< int > phys_table_ids)
 
std::shared_ptr
< CompilationContext
getCodeFromCache (const CodeCacheKey &, const CodeCache &)
 
std::vector< int8_t > serializeLiterals (const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
 
llvm::Value * spillDoubleElement (llvm::Value *elem_val, llvm::Type *elem_ty)
 
ExecutorMutexHolder acquireExecuteMutex ()
 

Static Private Member Functions

static size_t align (const size_t off_in, const size_t alignment)
 

Private Attributes

std::unique_ptr< CgenStatecgen_state_
 
std::unique_ptr< PlanStateplan_state_
 
std::shared_ptr
< RowSetMemoryOwner
row_set_mem_owner_
 
std::mutex gpu_exec_mutex_ [max_gpu_count]
 
std::mutex str_dict_mutex_
 
std::unique_ptr
< llvm::TargetMachine > 
nvptx_target_machine_
 
CodeCache cpu_code_cache_
 
CodeCache gpu_code_cache_
 
const unsigned block_size_x_
 
const unsigned grid_size_x_
 
const size_t max_gpu_slab_size_
 
const std::string debug_dir_
 
const std::string debug_file_
 
const ExecutorId executor_id_
 
const Catalog_Namespace::Catalogcatalog_
 
const TemporaryTablestemporary_tables_
 
int64_t kernel_queue_time_ms_ = 0
 
int64_t compilation_queue_time_ms_ = 0
 
std::unique_ptr
< WindowProjectNodeContext
window_project_node_context_owned_
 
WindowFunctionContextactive_window_function_ {nullptr}
 
InputTableInfoCache input_table_info_cache_
 
AggregatedColRange agg_col_range_cache_
 
TableGenerations table_generations_
 

Static Private Attributes

static const int max_gpu_count {16}
 
static std::mutex gpu_active_modules_mutex_
 
static uint32_t gpu_active_modules_device_mask_ {0x0}
 
static void * gpu_active_modules_ [max_gpu_count]
 
static std::atomic< bool > interrupted_ {false}
 
static const size_t baseline_threshold
 
static const size_t code_cache_size {1000}
 
static mapd_shared_mutex executor_session_mutex_
 
static QuerySessionId current_query_session_ {""}
 
static size_t running_query_executor_id_ {0}
 
static InterruptFlagMap queries_interrupt_flag_
 
static QuerySessionMap queries_session_map_
 
static std::map< int,
std::shared_ptr< Executor > > 
executors_
 
static std::atomic_flag execute_spin_lock_ = ATOMIC_FLAG_INIT
 
static mapd_shared_mutex execute_mutex_
 
static mapd_shared_mutex executors_cache_mutex_
 
static mapd_shared_mutex recycler_mutex_
 
static std::unordered_map
< std::string, size_t > 
cardinality_cache_
 

Friends

class BaselineJoinHashTable
 
class CodeGenerator
 
class ColumnFetcher
 
struct DiamondCodegen
 
class ExecutionKernel
 
class HashJoin
 
class OverlapsJoinHashTable
 
class GroupByAndAggregate
 
class QueryCompilationDescriptor
 
class QueryMemoryDescriptor
 
class QueryMemoryInitializer
 
class QueryFragmentDescriptor
 
class QueryExecutionContext
 
class ResultSet
 
class InValuesBitmap
 
class LeafAggregator
 
class PerfectJoinHashTable
 
class QueryRewriter
 
class PendingExecutionClosure
 
class RelAlgExecutor
 
class TableOptimizer
 
class TableFunctionCompilationContext
 
class TableFunctionExecutionContext
 
struct TargetExprCodegenBuilder
 
struct TargetExprCodegen
 
class WindowProjectNodeContext
 

Detailed Description

Definition at line 358 of file Execute.h.

Member Typedef Documentation

using Executor::CachedCardinality = std::pair<bool, size_t>

Definition at line 989 of file Execute.h.

using Executor::ExecutorId = size_t

Definition at line 365 of file Execute.h.

Definition at line 537 of file Execute.h.

Constructor & Destructor Documentation

Executor::Executor ( const ExecutorId  id,
const size_t  block_size_x,
const size_t  grid_size_x,
const size_t  max_gpu_slab_size,
const std::string &  debug_dir,
const std::string &  debug_file 
)

Definition at line 144 of file Execute.cpp.

150  : cgen_state_(new CgenState({}, false))
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009

Member Function Documentation

ExecutorMutexHolder Executor::acquireExecuteMutex ( )
inlineprivate

Definition at line 1089 of file Execute.h.

References execute_mutex_, executor_id_, Executor::ExecutorMutexHolder::shared_lock, Executor::ExecutorMutexHolder::unique_lock, and UNITARY_EXECUTOR_ID.

1089  {
1090  ExecutorMutexHolder ret;
1092  // Only one unitary executor can run at a time
1093  ret.unique_lock = mapd_unique_lock<mapd_shared_mutex>(execute_mutex_);
1094  } else {
1095  ret.shared_lock = mapd_shared_lock<mapd_shared_mutex>(execute_mutex_);
1096  }
1097  return ret;
1098  }
static mapd_shared_mutex execute_mutex_
Definition: Execute.h:1083
const ExecutorId executor_id_
Definition: Execute.h:1052
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:366
void Executor::addCodeToCache ( const CodeCacheKey key,
std::shared_ptr< CompilationContext compilation_context,
llvm::Module *  module,
CodeCache cache 
)
static

Definition at line 385 of file NativeCodegen.cpp.

References LruCache< key_t, value_t, hash_t >::put().

Referenced by StubGenerator::generateStub().

388  {
389  cache.put(key,
390  std::make_pair<std::shared_ptr<CompilationContext>, decltype(module)>(
391  std::move(compilation_context), std::move(module)));
392 }
void put(const key_t &key, value_t &&value)
Definition: LruCache.hpp:27

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::tuple< RelAlgExecutionUnit, PlanState::DeletedColumnsMap > Executor::addDeletedColumn ( const RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co 
)
private

Definition at line 3434 of file Execute.cpp.

References anonymous_namespace{Execute.cpp}::add_deleted_col_to_map(), catalog_(), CHECK, CompilationOptions::filter_on_deleted_column, and TABLE.

3436  {
3437  if (!co.filter_on_deleted_column) {
3438  return std::make_tuple(ra_exe_unit, PlanState::DeletedColumnsMap{});
3439  }
3440  auto ra_exe_unit_with_deleted = ra_exe_unit;
3441  PlanState::DeletedColumnsMap deleted_cols_map;
3442  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3443  if (input_table.getSourceType() != InputSourceType::TABLE) {
3444  continue;
3445  }
3446  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3447  CHECK(td);
3448  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3449  if (!deleted_cd) {
3450  continue;
3451  }
3452  CHECK(deleted_cd->columnType.is_boolean());
3453  // check deleted column is not already present
3454  bool found = false;
3455  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3456  if (input_col.get()->getColId() == deleted_cd->columnId &&
3457  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3458  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3459  found = true;
3460  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3461  break;
3462  }
3463  }
3464  if (!found) {
3465  // add deleted column
3466  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3467  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3468  add_deleted_col_to_map(deleted_cols_map, deleted_cd);
3469  }
3470  }
3471  return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
3472 }
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1053
const ColumnDescriptor * getDeletedColumnIfRowsDeleted(const TableDescriptor *td) const
Definition: Catalog.cpp:3136
std::unordered_map< TableId, const ColumnDescriptor * > DeletedColumnsMap
Definition: PlanState.h:44
#define CHECK(condition)
Definition: Logger.h:203
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
void add_deleted_col_to_map(PlanState::DeletedColumnsMap &deleted_cols_map, const ColumnDescriptor *deleted_cd)
Definition: Execute.cpp:3422

+ Here is the call graph for this function:

llvm::Value * Executor::addJoinLoopIterator ( const std::vector< llvm::Value * > &  prev_iters,
const size_t  level_idx 
)
private

Definition at line 806 of file IRCodegen.cpp.

References AUTOMATIC_IR_METADATA, CodeGenerator::cgen_state_, CHECK, and CgenState::scan_idx_to_hash_pos_.

807  {
809  // Iterators are added for loop-outer joins when the head of the loop is generated,
810  // then once again when the body if generated. Allow this instead of special handling
811  // of call sites.
812  const auto it = cgen_state_->scan_idx_to_hash_pos_.find(level_idx);
813  if (it != cgen_state_->scan_idx_to_hash_pos_.end()) {
814  return it->second;
815  }
816  CHECK(!prev_iters.empty());
817  llvm::Value* matching_row_index = prev_iters.back();
818  const auto it_ok =
819  cgen_state_->scan_idx_to_hash_pos_.emplace(level_idx, matching_row_index);
820  CHECK(it_ok.second);
821  return matching_row_index;
822 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
#define AUTOMATIC_IR_METADATA(CGENSTATE)
#define CHECK(condition)
Definition: Logger.h:203
void Executor::addToCardinalityCache ( const std::string &  cache_key,
const size_t  cache_value 
)

Definition at line 4167 of file Execute.cpp.

References g_use_estimator_result_cache, and VLOG.

4168  {
4170  mapd_unique_lock<mapd_shared_mutex> lock(recycler_mutex_);
4171  cardinality_cache_[cache_key] = cache_value;
4172  VLOG(1) << "Put estimated cardinality to the cache";
4173  }
4174 }
static std::unordered_map< std::string, size_t > cardinality_cache_
Definition: Execute.h:1105
bool g_use_estimator_result_cache
Definition: Execute.cpp:116
static mapd_shared_mutex recycler_mutex_
Definition: Execute.h:1104
#define VLOG(n)
Definition: Logger.h:297
bool Executor::addToQuerySessionList ( const QuerySessionId query_session,
const std::string &  query_str,
const std::string &  submitted,
const size_t  executor_id,
const QuerySessionStatus::QueryStatus  query_status,
mapd_unique_lock< mapd_shared_mutex > &  write_lock 
)

Definition at line 3984 of file Execute.cpp.

3989  {
3990  // an internal API that enrolls the query session into the Executor's session map
3991  if (queries_session_map_.count(query_session)) {
3992  if (queries_session_map_.at(query_session).count(submitted_time_str)) {
3993  queries_session_map_.at(query_session).erase(submitted_time_str);
3994  queries_session_map_.at(query_session)
3995  .emplace(submitted_time_str,
3996  QuerySessionStatus(query_session,
3997  executor_id,
3998  query_str,
3999  submitted_time_str,
4000  query_status));
4001  } else {
4002  queries_session_map_.at(query_session)
4003  .emplace(submitted_time_str,
4004  QuerySessionStatus(query_session,
4005  executor_id,
4006  query_str,
4007  submitted_time_str,
4008  query_status));
4009  }
4010  } else {
4011  std::map<std::string, QuerySessionStatus> executor_per_query_map;
4012  executor_per_query_map.emplace(
4013  submitted_time_str,
4015  query_session, executor_id, query_str, submitted_time_str, query_status));
4016  queries_session_map_.emplace(query_session, executor_per_query_map);
4017  }
4018  return queries_interrupt_flag_.emplace(query_session, false).second;
4019 }
static QuerySessionMap queries_session_map_
Definition: Execute.h:1076
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:1074
llvm::Value * Executor::aggregateWindowStatePtr ( )
private

Definition at line 124 of file WindowFunctionIR.cpp.

References AUTOMATIC_IR_METADATA, anonymous_namespace{WindowFunctionIR.cpp}::get_adjusted_window_type_info(), get_int_type(), WindowProjectNodeContext::getActiveWindowFunctionContext(), and kFLOAT.

124  {
126  const auto window_func_context =
128  const auto window_func = window_func_context->getWindowFunction();
129  const auto arg_ti = get_adjusted_window_type_info(window_func);
130  llvm::Type* aggregate_state_type =
131  arg_ti.get_type() == kFLOAT
132  ? llvm::PointerType::get(get_int_type(32, cgen_state_->context_), 0)
133  : llvm::PointerType::get(get_int_type(64, cgen_state_->context_), 0);
134  const auto aggregate_state_i64 = cgen_state_->llInt(
135  reinterpret_cast<const int64_t>(window_func_context->aggregateState()));
136  return cgen_state_->ir_builder_.CreateIntToPtr(aggregate_state_i64,
137  aggregate_state_type);
138 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
SQLTypeInfo get_adjusted_window_type_info(const Analyzer::WindowFunction *window_func)

+ Here is the call graph for this function:

static size_t Executor::align ( const size_t  off_in,
const size_t  alignment 
)
inlinestaticprivate

Definition at line 1001 of file Execute.h.

1001  {
1002  size_t off = off_in;
1003  if (off % alignment != 0) {
1004  off += (alignment - off % alignment);
1005  }
1006  return off;
1007  }
CurrentQueryStatus Executor::attachExecutorToQuerySession ( const QuerySessionId query_session_id,
const std::string &  query_str,
const std::string &  query_submitted_time 
)

Definition at line 3854 of file Execute.cpp.

References executor_id_().

3857  {
3858  if (!query_session_id.empty()) {
3859  // if session is valid, do update 1) the exact executor id and 2) query status
3860  mapd_unique_lock<mapd_shared_mutex> write_lock(executor_session_mutex_);
3862  query_session_id, query_submitted_time, executor_id_, write_lock);
3863  updateQuerySessionStatusWithLock(query_session_id,
3864  query_submitted_time,
3865  QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
3866  write_lock);
3867  }
3868  return {query_session_id, query_str};
3869 }
bool updateQuerySessionStatusWithLock(const QuerySessionId &query_session, const std::string &submitted_time_str, const QuerySessionStatus::QueryStatus updated_query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4021
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1068
const ExecutorId executor_id_
Definition: Execute.h:1052
bool updateQuerySessionExecutorAssignment(const QuerySessionId &query_session, const std::string &submitted_time_str, const size_t executor_id, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4047
mapd_unique_lock< mapd_shared_mutex > write_lock

+ Here is the call graph for this function:

unsigned Executor::blockSize ( ) const

Definition at line 3341 of file Execute.cpp.

References block_size_x_(), catalog_(), and CHECK.

3341  {
3342  CHECK(catalog_);
3343  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3344  if (!cuda_mgr) {
3345  return 0;
3346  }
3347  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3348  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
3349 }
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:207
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1053
const unsigned block_size_x_
Definition: Execute.h:1046
#define CHECK(condition)
Definition: Logger.h:203
const std::vector< DeviceProperties > & getAllDeviceProperties() const
Definition: CudaMgr.h:120

+ Here is the call graph for this function:

std::shared_ptr< HashJoin > Executor::buildCurrentLevelHashTable ( const JoinCondition current_level_join_conditions,
RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co,
const std::vector< InputTableInfo > &  query_infos,
ColumnCacheMap column_cache,
std::vector< std::string > &  fail_reasons 
)
private

Definition at line 659 of file IRCodegen.cpp.

References anonymous_namespace{IRCodegen.cpp}::add_qualifier_to_execution_unit(), AUTOMATIC_IR_METADATA, CodeGenerator::cgen_state_, anonymous_namespace{IRCodegen.cpp}::check_valid_join_qual(), Data_Namespace::CPU_LEVEL, CompilationOptions::device_type, Executor::JoinHashTableOrError::fail_reason, GPU, Data_Namespace::GPU_LEVEL, Executor::JoinHashTableOrError::hash_table, INNER, IS_EQUIVALENCE, PlanState::join_info_, OneToOne, CodeGenerator::plan_state_, JoinCondition::quals, RelAlgExecutionUnit::query_hint, and JoinCondition::type.

665  {
667  if (current_level_join_conditions.type != JoinType::INNER &&
668  current_level_join_conditions.quals.size() > 1) {
669  fail_reasons.emplace_back("No equijoin expression found for outer join");
670  return nullptr;
671  }
672  std::shared_ptr<HashJoin> current_level_hash_table;
673  for (const auto& join_qual : current_level_join_conditions.quals) {
674  auto qual_bin_oper = std::dynamic_pointer_cast<Analyzer::BinOper>(join_qual);
675  if (!qual_bin_oper || !IS_EQUIVALENCE(qual_bin_oper->get_optype())) {
676  fail_reasons.emplace_back("No equijoin expression found");
677  if (current_level_join_conditions.type == JoinType::INNER) {
678  add_qualifier_to_execution_unit(ra_exe_unit, join_qual);
679  }
680  continue;
681  }
682  check_valid_join_qual(qual_bin_oper);
683  JoinHashTableOrError hash_table_or_error;
684  if (!current_level_hash_table) {
685  hash_table_or_error = buildHashTableForQualifier(
686  qual_bin_oper,
687  query_infos,
691  column_cache,
692  ra_exe_unit.query_hint);
693  current_level_hash_table = hash_table_or_error.hash_table;
694  }
695  if (hash_table_or_error.hash_table) {
696  plan_state_->join_info_.join_hash_tables_.push_back(hash_table_or_error.hash_table);
697  plan_state_->join_info_.equi_join_tautologies_.push_back(qual_bin_oper);
698  } else {
699  fail_reasons.push_back(hash_table_or_error.fail_reason);
700  if (current_level_join_conditions.type == JoinType::INNER) {
701  add_qualifier_to_execution_unit(ra_exe_unit, qual_bin_oper);
702  }
703  }
704  }
705  return current_level_hash_table;
706 }
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:67
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1024
void add_qualifier_to_execution_unit(RelAlgExecutionUnit &ra_exe_unit, const std::shared_ptr< Analyzer::Expr > &qual)
Definition: IRCodegen.cpp:230
#define AUTOMATIC_IR_METADATA(CGENSTATE)
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const HashType preferred_hash_type, ColumnCacheMap &column_cache, const RegisteredQueryHint &query_hint)
Definition: Execute.cpp:3287
ExecutorDeviceType device_type
std::list< std::shared_ptr< Analyzer::Expr > > quals
RegisteredQueryHint query_hint
void check_valid_join_qual(std::shared_ptr< Analyzer::BinOper > &bin_oper)
Definition: IRCodegen.cpp:260

+ Here is the call graph for this function:

Executor::JoinHashTableOrError Executor::buildHashTableForQualifier ( const std::shared_ptr< Analyzer::BinOper > &  qual_bin_oper,
const std::vector< InputTableInfo > &  query_infos,
const MemoryLevel  memory_level,
const HashType  preferred_hash_type,
ColumnCacheMap column_cache,
const RegisteredQueryHint query_hint 
)
private

Definition at line 3287 of file Execute.cpp.

References g_enable_dynamic_watchdog, g_enable_overlaps_hashjoin, and HashJoin::getInstance().

3293  {
3294  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
3295  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
3296  }
3297  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3299  }
3300  try {
3301  auto tbl = HashJoin::getInstance(qual_bin_oper,
3302  query_infos,
3303  memory_level,
3304  preferred_hash_type,
3305  deviceCountForMemoryLevel(memory_level),
3306  column_cache,
3307  this,
3308  query_hint);
3309  return {tbl, ""};
3310  } catch (const HashJoinFail& e) {
3311  return {nullptr, e.what()};
3312  }
3313 }
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1116
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:96
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const RegisteredQueryHint &query_hint)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:238
static std::atomic< bool > interrupted_
Definition: Execute.h:1033
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:655

+ Here is the call graph for this function:

JoinLoop::HoistedFiltersCallback Executor::buildHoistLeftHandSideFiltersCb ( const RelAlgExecutionUnit ra_exe_unit,
const size_t  level_idx,
const int  inner_table_id,
const CompilationOptions co 
)
private

Definition at line 492 of file IRCodegen.cpp.

References AUTOMATIC_IR_METADATA, CodeGenerator::cgen_state_, CHECK, CodeGenerator::codegen(), g_enable_left_join_filter_hoisting, PlanState::hoisted_filters_, RelAlgExecutionUnit::join_quals, LEFT, CgenState::llBool(), CodeGenerator::plan_state_, RelAlgExecutionUnit::quals, RelAlgExecutionUnit::simple_quals, CodeGenerator::toBool(), and VLOG.

496  {
498  return nullptr;
499  }
500 
501  const auto& current_level_join_conditions = ra_exe_unit.join_quals[level_idx];
502  if (current_level_join_conditions.type == JoinType::LEFT) {
503  const auto& condition = current_level_join_conditions.quals.front();
504  const auto bin_oper = dynamic_cast<const Analyzer::BinOper*>(condition.get());
505  CHECK(bin_oper) << condition->toString();
506  const auto rhs =
507  dynamic_cast<const Analyzer::ColumnVar*>(bin_oper->get_right_operand());
508  const auto lhs =
509  dynamic_cast<const Analyzer::ColumnVar*>(bin_oper->get_left_operand());
510  if (lhs && rhs && lhs->get_table_id() != rhs->get_table_id()) {
511  const Analyzer::ColumnVar* selected_lhs{nullptr};
512  // grab the left hand side column -- this is somewhat similar to normalize column
513  // pair, and a better solution may be to hoist that function out of the join
514  // framework and normalize columns at the top of build join loops
515  if (lhs->get_table_id() == inner_table_id) {
516  selected_lhs = rhs;
517  } else if (rhs->get_table_id() == inner_table_id) {
518  selected_lhs = lhs;
519  }
520  if (selected_lhs) {
521  std::list<std::shared_ptr<Analyzer::Expr>> hoisted_quals;
522  // get all LHS-only filters
523  auto should_hoist_qual = [&hoisted_quals](const auto& qual, const int table_id) {
524  CHECK(qual);
525 
526  ExprTableIdVisitor visitor;
527  const auto table_ids = visitor.visit(qual.get());
528  if (table_ids.size() == 1 && table_ids.find(table_id) != table_ids.end()) {
529  hoisted_quals.push_back(qual);
530  }
531  };
532  for (const auto& qual : ra_exe_unit.simple_quals) {
533  should_hoist_qual(qual, selected_lhs->get_table_id());
534  }
535  for (const auto& qual : ra_exe_unit.quals) {
536  should_hoist_qual(qual, selected_lhs->get_table_id());
537  }
538 
539  // build the filters callback and return it
540  if (!hoisted_quals.empty()) {
541  return [this, hoisted_quals, co](llvm::BasicBlock* true_bb,
542  llvm::BasicBlock* exit_bb,
543  const std::string& loop_name,
544  llvm::Function* parent_func,
545  CgenState* cgen_state) -> llvm::BasicBlock* {
546  // make sure we have quals to hoist
547  bool has_quals_to_hoist = false;
548  for (const auto& qual : hoisted_quals) {
549  // check to see if the filter was previously hoisted. if all filters were
550  // previously hoisted, this callback becomes a noop
551  if (plan_state_->hoisted_filters_.count(qual) == 0) {
552  has_quals_to_hoist = true;
553  break;
554  }
555  }
556 
557  if (!has_quals_to_hoist) {
558  return nullptr;
559  }
560 
561  AUTOMATIC_IR_METADATA(cgen_state);
562 
563  llvm::IRBuilder<>& builder = cgen_state->ir_builder_;
564  auto& context = builder.getContext();
565 
566  const auto filter_bb =
567  llvm::BasicBlock::Create(context,
568  "hoisted_left_join_filters_" + loop_name,
569  parent_func,
570  /*insert_before=*/true_bb);
571  builder.SetInsertPoint(filter_bb);
572 
573  llvm::Value* filter_lv = cgen_state_->llBool(true);
574  CodeGenerator code_generator(this);
576  for (const auto& qual : hoisted_quals) {
577  if (plan_state_->hoisted_filters_.insert(qual).second) {
578  // qual was inserted into the hoisted filters map, which means we have not
579  // seen this qual before. Generate filter.
580  VLOG(1) << "Generating code for hoisted left hand side qualifier "
581  << qual->toString();
582  auto cond = code_generator.toBool(
583  code_generator.codegen(qual.get(), true, co).front());
584  filter_lv = builder.CreateAnd(filter_lv, cond);
585  }
586  }
587  CHECK(filter_lv->getType()->isIntegerTy(1));
588 
589  builder.CreateCondBr(filter_lv, true_bb, exit_bb);
590  return filter_bb;
591  };
592  }
593  }
594  }
595  }
596  return nullptr;
597 }
bool g_enable_left_join_filter_hoisting
Definition: Execute.cpp:94
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
const JoinQualsPerNestingLevel join_quals
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1024
#define AUTOMATIC_IR_METADATA(CGENSTATE)
std::list< std::shared_ptr< Analyzer::Expr > > quals
#define CHECK(condition)
Definition: Logger.h:203
#define VLOG(n)
Definition: Logger.h:297
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals

+ Here is the call graph for this function:

std::function< llvm::Value *(const std::vector< llvm::Value * > &, llvm::Value *)> Executor::buildIsDeletedCb ( const RelAlgExecutionUnit ra_exe_unit,
const size_t  level_idx,
const CompilationOptions co 
)
private

Definition at line 600 of file IRCodegen.cpp.

References AUTOMATIC_IR_METADATA, CodeGenerator::cgen_state_, CHECK, CHECK_LT, CodeGenerator::codegen(), CgenState::context_, CgenState::current_func_, CompilationOptions::filter_on_deleted_column, PlanState::getDeletedColForTable(), RelAlgExecutionUnit::input_descs, CgenState::ir_builder_, CgenState::llBool(), CgenState::llInt(), CodeGenerator::plan_state_, TABLE, and CodeGenerator::toBool().

602  {
604  if (!co.filter_on_deleted_column) {
605  return nullptr;
606  }
607  CHECK_LT(level_idx + 1, ra_exe_unit.input_descs.size());
608  const auto input_desc = ra_exe_unit.input_descs[level_idx + 1];
609  if (input_desc.getSourceType() != InputSourceType::TABLE) {
610  return nullptr;
611  }
612 
613  const auto deleted_cd = plan_state_->getDeletedColForTable(input_desc.getTableId());
614  if (!deleted_cd) {
615  return nullptr;
616  }
617  CHECK(deleted_cd->columnType.is_boolean());
618  const auto deleted_expr = makeExpr<Analyzer::ColumnVar>(deleted_cd->columnType,
619  input_desc.getTableId(),
620  deleted_cd->columnId,
621  input_desc.getNestLevel());
622  return [this, deleted_expr, level_idx, &co](const std::vector<llvm::Value*>& prev_iters,
623  llvm::Value* have_more_inner_rows) {
624  const auto matching_row_index = addJoinLoopIterator(prev_iters, level_idx + 1);
625  // Avoid fetching the deleted column from a position which is not valid.
626  // An invalid position can be returned by a one to one hash lookup (negative)
627  // or at the end of iteration over a set of matching values.
628  llvm::Value* is_valid_it{nullptr};
629  if (have_more_inner_rows) {
630  is_valid_it = have_more_inner_rows;
631  } else {
632  is_valid_it = cgen_state_->ir_builder_.CreateICmp(
633  llvm::ICmpInst::ICMP_SGE, matching_row_index, cgen_state_->llInt<int64_t>(0));
634  }
635  const auto it_valid_bb = llvm::BasicBlock::Create(
636  cgen_state_->context_, "it_valid", cgen_state_->current_func_);
637  const auto it_not_valid_bb = llvm::BasicBlock::Create(
638  cgen_state_->context_, "it_not_valid", cgen_state_->current_func_);
639  cgen_state_->ir_builder_.CreateCondBr(is_valid_it, it_valid_bb, it_not_valid_bb);
640  const auto row_is_deleted_bb = llvm::BasicBlock::Create(
641  cgen_state_->context_, "row_is_deleted", cgen_state_->current_func_);
642  cgen_state_->ir_builder_.SetInsertPoint(it_valid_bb);
643  CodeGenerator code_generator(this);
644  const auto row_is_deleted = code_generator.toBool(
645  code_generator.codegen(deleted_expr.get(), true, co).front());
646  cgen_state_->ir_builder_.CreateBr(row_is_deleted_bb);
647  cgen_state_->ir_builder_.SetInsertPoint(it_not_valid_bb);
648  const auto row_is_deleted_default = cgen_state_->llBool(false);
649  cgen_state_->ir_builder_.CreateBr(row_is_deleted_bb);
650  cgen_state_->ir_builder_.SetInsertPoint(row_is_deleted_bb);
651  auto row_is_deleted_or_default =
652  cgen_state_->ir_builder_.CreatePHI(row_is_deleted->getType(), 2);
653  row_is_deleted_or_default->addIncoming(row_is_deleted, it_valid_bb);
654  row_is_deleted_or_default->addIncoming(row_is_deleted_default, it_not_valid_bb);
655  return row_is_deleted_or_default;
656  };
657 }
std::vector< InputDescriptor > input_descs
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1024
#define AUTOMATIC_IR_METADATA(CGENSTATE)
#define CHECK_LT(x, y)
Definition: Logger.h:213
llvm::Value * addJoinLoopIterator(const std::vector< llvm::Value * > &prev_iters, const size_t level_idx)
Definition: IRCodegen.cpp:806
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

std::vector< JoinLoop > Executor::buildJoinLoops ( RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co,
const ExecutionOptions eo,
const std::vector< InputTableInfo > &  query_infos,
ColumnCacheMap column_cache 
)
private

Definition at line 279 of file IRCodegen.cpp.

References AUTOMATIC_IR_METADATA, CodeGenerator::cgen_state_, CHECK, CHECK_LT, CodeGenerator::codegen(), INJECT_TIMER, CgenState::ir_builder_, RelAlgExecutionUnit::join_quals, LEFT, CgenState::llBool(), OneToOne, CgenState::outer_join_match_found_per_level_, Set, Singleton, JoinLoopDomain::slot_lookup_result, CodeGenerator::toBool(), JoinCondition::type, and JoinLoopDomain::values_buffer.

284  {
287  std::vector<JoinLoop> join_loops;
288  for (size_t level_idx = 0, current_hash_table_idx = 0;
289  level_idx < ra_exe_unit.join_quals.size();
290  ++level_idx) {
291  const auto& current_level_join_conditions = ra_exe_unit.join_quals[level_idx];
292  std::vector<std::string> fail_reasons;
293  const auto build_cur_level_hash_table = [&]() {
294  if (current_level_join_conditions.quals.size() > 1) {
295  const auto first_qual = *current_level_join_conditions.quals.begin();
296  auto qual_bin_oper =
297  std::dynamic_pointer_cast<const Analyzer::BinOper>(first_qual);
298  if (qual_bin_oper && qual_bin_oper->is_overlaps_oper() &&
299  current_level_join_conditions.type == JoinType::LEFT) {
300  JoinCondition join_condition{{first_qual}, current_level_join_conditions.type};
301 
303  join_condition, ra_exe_unit, co, query_infos, column_cache, fail_reasons);
304  }
305  }
306  return buildCurrentLevelHashTable(current_level_join_conditions,
307  ra_exe_unit,
308  co,
309  query_infos,
310  column_cache,
311  fail_reasons);
312  };
313  const auto current_level_hash_table = build_cur_level_hash_table();
314  const auto found_outer_join_matches_cb =
315  [this, level_idx](llvm::Value* found_outer_join_matches) {
316  CHECK_LT(level_idx, cgen_state_->outer_join_match_found_per_level_.size());
317  CHECK(!cgen_state_->outer_join_match_found_per_level_[level_idx]);
318  cgen_state_->outer_join_match_found_per_level_[level_idx] =
319  found_outer_join_matches;
320  };
321  const auto is_deleted_cb = buildIsDeletedCb(ra_exe_unit, level_idx, co);
322  const auto outer_join_condition_multi_quals_cb =
323  [this, level_idx, &co, &current_level_join_conditions](
324  const std::vector<llvm::Value*>& prev_iters) {
325  // The values generated for the match path don't dominate all uses
326  // since on the non-match path nulls are generated. Reset the cache
327  // once the condition is generated to avoid incorrect reuse.
328  FetchCacheAnchor anchor(cgen_state_.get());
329  addJoinLoopIterator(prev_iters, level_idx + 1);
330  llvm::Value* left_join_cond = cgen_state_->llBool(true);
331  CodeGenerator code_generator(this);
332  // Do not want to look at all quals! only 1..N quals (ignore first qual)
333  // Note(jclay): this may need to support cases larger than 2
334  // are there any?
335  if (current_level_join_conditions.quals.size() >= 2) {
336  auto qual_it = std::next(current_level_join_conditions.quals.begin(), 1);
337  for (; qual_it != current_level_join_conditions.quals.end();
338  std::advance(qual_it, 1)) {
339  left_join_cond = cgen_state_->ir_builder_.CreateAnd(
340  left_join_cond,
341  code_generator.toBool(
342  code_generator.codegen(qual_it->get(), true, co).front()));
343  }
344  }
345  return left_join_cond;
346  };
347  if (current_level_hash_table) {
348  const auto hoisted_filters_cb = buildHoistLeftHandSideFiltersCb(
349  ra_exe_unit, level_idx, current_level_hash_table->getInnerTableId(), co);
350  if (current_level_hash_table->getHashType() == HashType::OneToOne) {
351  join_loops.emplace_back(
352  /*kind=*/JoinLoopKind::Singleton,
353  /*type=*/current_level_join_conditions.type,
354  /*iteration_domain_codegen=*/
355  [this, current_hash_table_idx, level_idx, current_level_hash_table, &co](
356  const std::vector<llvm::Value*>& prev_iters) {
357  addJoinLoopIterator(prev_iters, level_idx);
358  JoinLoopDomain domain{{0}};
359  domain.slot_lookup_result =
360  current_level_hash_table->codegenSlot(co, current_hash_table_idx);
361  return domain;
362  },
363  /*outer_condition_match=*/nullptr,
364  /*found_outer_matches=*/current_level_join_conditions.type == JoinType::LEFT
365  ? std::function<void(llvm::Value*)>(found_outer_join_matches_cb)
366  : nullptr,
367  /*hoisted_filters=*/hoisted_filters_cb,
368  /*is_deleted=*/is_deleted_cb);
369  } else {
370  join_loops.emplace_back(
371  /*kind=*/JoinLoopKind::Set,
372  /*type=*/current_level_join_conditions.type,
373  /*iteration_domain_codegen=*/
374  [this, current_hash_table_idx, level_idx, current_level_hash_table, &co](
375  const std::vector<llvm::Value*>& prev_iters) {
376  addJoinLoopIterator(prev_iters, level_idx);
377  JoinLoopDomain domain{{0}};
378  const auto matching_set = current_level_hash_table->codegenMatchingSet(
379  co, current_hash_table_idx);
380  domain.values_buffer = matching_set.elements;
381  domain.element_count = matching_set.count;
382  return domain;
383  },
384  /*outer_condition_match=*/
385  current_level_join_conditions.type == JoinType::LEFT
386  ? std::function<llvm::Value*(const std::vector<llvm::Value*>&)>(
387  outer_join_condition_multi_quals_cb)
388  : nullptr,
389  /*found_outer_matches=*/current_level_join_conditions.type == JoinType::LEFT
390  ? std::function<void(llvm::Value*)>(found_outer_join_matches_cb)
391  : nullptr,
392  /*hoisted_filters=*/hoisted_filters_cb,
393  /*is_deleted=*/is_deleted_cb);
394  }
395  ++current_hash_table_idx;
396  } else {
397  const auto fail_reasons_str = current_level_join_conditions.quals.empty()
398  ? "No equijoin expression found"
399  : boost::algorithm::join(fail_reasons, " | ");
401  ra_exe_unit, eo, query_infos, level_idx, fail_reasons_str);
402  // Callback provided to the `JoinLoop` framework to evaluate the (outer) join
403  // condition.
404  VLOG(1) << "Unable to build hash table, falling back to loop join: "
405  << fail_reasons_str;
406  const auto outer_join_condition_cb =
407  [this, level_idx, &co, &current_level_join_conditions](
408  const std::vector<llvm::Value*>& prev_iters) {
409  // The values generated for the match path don't dominate all uses
410  // since on the non-match path nulls are generated. Reset the cache
411  // once the condition is generated to avoid incorrect reuse.
412  FetchCacheAnchor anchor(cgen_state_.get());
413  addJoinLoopIterator(prev_iters, level_idx + 1);
414  llvm::Value* left_join_cond = cgen_state_->llBool(true);
415  CodeGenerator code_generator(this);
416  for (auto expr : current_level_join_conditions.quals) {
417  left_join_cond = cgen_state_->ir_builder_.CreateAnd(
418  left_join_cond,
419  code_generator.toBool(
420  code_generator.codegen(expr.get(), true, co).front()));
421  }
422  return left_join_cond;
423  };
424  join_loops.emplace_back(
425  /*kind=*/JoinLoopKind::UpperBound,
426  /*type=*/current_level_join_conditions.type,
427  /*iteration_domain_codegen=*/
428  [this, level_idx](const std::vector<llvm::Value*>& prev_iters) {
429  addJoinLoopIterator(prev_iters, level_idx);
430  JoinLoopDomain domain{{0}};
431  const auto rows_per_scan_ptr = cgen_state_->ir_builder_.CreateGEP(
432  get_arg_by_name(cgen_state_->row_func_, "num_rows_per_scan"),
433  cgen_state_->llInt(int32_t(level_idx + 1)));
434  domain.upper_bound = cgen_state_->ir_builder_.CreateLoad(rows_per_scan_ptr,
435  "num_rows_per_scan");
436  return domain;
437  },
438  /*outer_condition_match=*/
439  current_level_join_conditions.type == JoinType::LEFT
440  ? std::function<llvm::Value*(const std::vector<llvm::Value*>&)>(
441  outer_join_condition_cb)
442  : nullptr,
443  /*found_outer_matches=*/
444  current_level_join_conditions.type == JoinType::LEFT
445  ? std::function<void(llvm::Value*)>(found_outer_join_matches_cb)
446  : nullptr,
447  /*hoisted_filters=*/nullptr,
448  /*is_deleted=*/is_deleted_cb);
449  }
450  }
451  return join_loops;
452 }
std::shared_ptr< HashJoin > buildCurrentLevelHashTable(const JoinCondition &current_level_join_conditions, RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const std::vector< InputTableInfo > &query_infos, ColumnCacheMap &column_cache, std::vector< std::string > &fail_reasons)
Definition: IRCodegen.cpp:659
llvm::Value * values_buffer
Definition: JoinLoop.h:48
std::string join(T const &container, std::string const &delim)
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:167
#define INJECT_TIMER(DESC)
Definition: measure.h:93
const JoinQualsPerNestingLevel join_quals
#define AUTOMATIC_IR_METADATA(CGENSTATE)
llvm::Value * slot_lookup_result
Definition: JoinLoop.h:46
#define CHECK_LT(x, y)
Definition: Logger.h:213
llvm::Value * addJoinLoopIterator(const std::vector< llvm::Value * > &prev_iters, const size_t level_idx)
Definition: IRCodegen.cpp:806
#define CHECK(condition)
Definition: Logger.h:203
void check_if_loop_join_is_allowed(RelAlgExecutionUnit &ra_exe_unit, const ExecutionOptions &eo, const std::vector< InputTableInfo > &query_infos, const size_t level_idx, const std::string &fail_reason)
Definition: IRCodegen.cpp:240
JoinLoop::HoistedFiltersCallback buildHoistLeftHandSideFiltersCb(const RelAlgExecutionUnit &ra_exe_unit, const size_t level_idx, const int inner_table_id, const CompilationOptions &co)
Definition: IRCodegen.cpp:492
std::vector< JoinLoop > buildJoinLoops(RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, const std::vector< InputTableInfo > &query_infos, ColumnCacheMap &column_cache)
Definition: IRCodegen.cpp:279
std::function< llvm::Value *(const std::vector< llvm::Value * > &, llvm::Value *)> buildIsDeletedCb(const RelAlgExecutionUnit &ra_exe_unit, const size_t level_idx, const CompilationOptions &co)
Definition: IRCodegen.cpp:600
#define VLOG(n)
Definition: Logger.h:297

+ Here is the call graph for this function:

void Executor::buildSelectedFragsMapping ( std::vector< std::vector< size_t >> &  selected_fragments_crossjoin,
std::vector< size_t > &  local_col_to_frag_pos,
const std::list< std::shared_ptr< const InputColDescriptor >> &  col_global_ids,
const FragmentsList selected_fragments,
const RelAlgExecutionUnit ra_exe_unit 
)
private

Definition at line 2758 of file Execute.cpp.

References CHECK, CHECK_EQ, CHECK_LT, and RelAlgExecutionUnit::input_descs.

2763  {
2764  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2765  size_t frag_pos{0};
2766  const auto& input_descs = ra_exe_unit.input_descs;
2767  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2768  const int table_id = input_descs[scan_idx].getTableId();
2769  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2770  selected_fragments_crossjoin.push_back(
2771  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2772  for (const auto& col_id : col_global_ids) {
2773  CHECK(col_id);
2774  const auto& input_desc = col_id->getScanDesc();
2775  if (input_desc.getTableId() != table_id ||
2776  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2777  continue;
2778  }
2779  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2780  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2781  CHECK_LT(static_cast<size_t>(it->second),
2782  plan_state_->global_to_local_col_ids_.size());
2783  local_col_to_frag_pos[it->second] = frag_pos;
2784  }
2785  ++frag_pos;
2786  }
2787 }
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::vector< InputDescriptor > input_descs
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1024
#define CHECK_LT(x, y)
Definition: Logger.h:213
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2744
#define CHECK(condition)
Definition: Logger.h:203
void Executor::buildSelectedFragsMappingForUnion ( std::vector< std::vector< size_t >> &  selected_fragments_crossjoin,
std::vector< size_t > &  local_col_to_frag_pos,
const std::list< std::shared_ptr< const InputColDescriptor >> &  col_global_ids,
const FragmentsList selected_fragments,
const RelAlgExecutionUnit ra_exe_unit 
)
private

Definition at line 2789 of file Execute.cpp.

References CHECK, CHECK_LT, and RelAlgExecutionUnit::input_descs.

2794  {
2795  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2796  size_t frag_pos{0};
2797  const auto& input_descs = ra_exe_unit.input_descs;
2798  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2799  const int table_id = input_descs[scan_idx].getTableId();
2800  // selected_fragments here is from assignFragsToKernelDispatch
2801  // execution_kernel.fragments
2802  if (selected_fragments[0].table_id != table_id) { // TODO 0
2803  continue;
2804  }
2805  // CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2806  selected_fragments_crossjoin.push_back(
2807  // getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2808  {size_t(1)}); // TODO
2809  for (const auto& col_id : col_global_ids) {
2810  CHECK(col_id);
2811  const auto& input_desc = col_id->getScanDesc();
2812  if (input_desc.getTableId() != table_id ||
2813  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2814  continue;
2815  }
2816  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2817  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2818  CHECK_LT(static_cast<size_t>(it->second),
2819  plan_state_->global_to_local_col_ids_.size());
2820  local_col_to_frag_pos[it->second] = frag_pos;
2821  }
2822  ++frag_pos;
2823  }
2824 }
std::vector< InputDescriptor > input_descs
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1024
#define CHECK_LT(x, y)
Definition: Logger.h:213
#define CHECK(condition)
Definition: Logger.h:203
llvm::Value * Executor::castToFP ( llvm::Value *  value,
SQLTypeInfo const from_ti,
SQLTypeInfo const to_ti 
)
private

Definition at line 3363 of file Execute.cpp.

References AUTOMATIC_IR_METADATA, exp_to_scale(), logger::FATAL, SQLTypeInfo::get_scale(), SQLTypeInfo::get_size(), SQLTypeInfo::is_fp(), SQLTypeInfo::is_number(), and LOG.

3365  {
3367  if (value->getType()->isIntegerTy() && from_ti.is_number() && to_ti.is_fp() &&
3368  (!from_ti.is_fp() || from_ti.get_size() != to_ti.get_size())) {
3369  llvm::Type* fp_type{nullptr};
3370  switch (to_ti.get_size()) {
3371  case 4:
3372  fp_type = llvm::Type::getFloatTy(cgen_state_->context_);
3373  break;
3374  case 8:
3375  fp_type = llvm::Type::getDoubleTy(cgen_state_->context_);
3376  break;
3377  default:
3378  LOG(FATAL) << "Unsupported FP size: " << to_ti.get_size();
3379  }
3380  value = cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
3381  if (from_ti.get_scale()) {
3382  value = cgen_state_->ir_builder_.CreateFDiv(
3383  value,
3384  llvm::ConstantFP::get(value->getType(), exp_to_scale(from_ti.get_scale())));
3385  }
3386  }
3387  return value;
3388 }
#define LOG(tag)
Definition: Logger.h:194
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
#define AUTOMATIC_IR_METADATA(CGENSTATE)
uint64_t exp_to_scale(const unsigned exp)

+ Here is the call graph for this function:

llvm::Value * Executor::castToIntPtrTyIn ( llvm::Value *  val,
const size_t  bit_width 
)
private

Definition at line 3390 of file Execute.cpp.

References AUTOMATIC_IR_METADATA, CHECK, CHECK_LT, and get_int_type().

3390  {
3392  CHECK(val->getType()->isPointerTy());
3393 
3394  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3395  const auto val_type = val_ptr_type->getElementType();
3396  size_t val_width = 0;
3397  if (val_type->isIntegerTy()) {
3398  val_width = val_type->getIntegerBitWidth();
3399  } else {
3400  if (val_type->isFloatTy()) {
3401  val_width = 32;
3402  } else {
3403  CHECK(val_type->isDoubleTy());
3404  val_width = 64;
3405  }
3406  }
3407  CHECK_LT(size_t(0), val_width);
3408  if (bitWidth == val_width) {
3409  return val;
3410  }
3411  return cgen_state_->ir_builder_.CreateBitCast(
3412  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3413 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
#define CHECK_LT(x, y)
Definition: Logger.h:213
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

bool Executor::checkCurrentQuerySession ( const std::string &  candidate_query_session,
mapd_shared_lock< mapd_shared_mutex > &  read_lock 
)

Definition at line 3840 of file Execute.cpp.

3841  {
3842  // if current_query_session is equal to the candidate_query_session,
3843  // or it is empty session we consider
3844  return !candidate_query_session.empty() &&
3845  (current_query_session_ == candidate_query_session);
3846 }
static QuerySessionId current_query_session_
Definition: Execute.h:1070
bool Executor::checkIsQuerySessionEnrolled ( const QuerySessionId query_session,
mapd_shared_lock< mapd_shared_mutex > &  read_lock 
)

Definition at line 4143 of file Execute.cpp.

4145  {
4146  if (query_session.empty()) {
4147  return false;
4148  }
4149  return !query_session.empty() && queries_session_map_.count(query_session);
4150 }
static QuerySessionMap queries_session_map_
Definition: Execute.h:1076
bool Executor::checkIsQuerySessionInterrupted ( const std::string &  query_session,
mapd_shared_lock< mapd_shared_mutex > &  read_lock 
)

Definition at line 4127 of file Execute.cpp.

4129  {
4130  if (query_session.empty()) {
4131  return false;
4132  }
4133  auto flag_it = queries_interrupt_flag_.find(query_session);
4134  return !query_session.empty() && flag_it != queries_interrupt_flag_.end() &&
4135  flag_it->second;
4136 }
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:1074
bool Executor::checkIsRunningQuerySessionInterrupted ( )

Definition at line 4138 of file Execute.cpp.

4138  {
4139  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
4140  return checkIsQuerySessionInterrupted(current_query_session_, session_read_lock);
4141 }
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1068
bool checkIsQuerySessionInterrupted(const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4127
static QuerySessionId current_query_session_
Definition: Execute.h:1070
void Executor::checkPendingQueryStatus ( const QuerySessionId query_session)

Definition at line 3871 of file Execute.cpp.

References ERR_INTERRUPTED, and VLOG.

3871  {
3872  // check whether we are okay to execute the "pending" query
3873  // i.e., before running the query check if this query session is "ALREADY" interrupted
3874  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3875  if (query_session.empty()) {
3876  return;
3877  }
3878  if (queries_interrupt_flag_.find(query_session) == queries_interrupt_flag_.end()) {
3879  // something goes wrong since we assume this is caller's responsibility
3880  // (call this function only for enrolled query session)
3881  if (!queries_session_map_.count(query_session)) {
3882  VLOG(1) << "Interrupting pending query is not available since the query session is "
3883  "not enrolled";
3884  } else {
3885  // here the query session is enrolled but the interrupt flag is not registered
3886  VLOG(1)
3887  << "Interrupting pending query is not available since its interrupt flag is "
3888  "not registered";
3889  }
3890  return;
3891  }
3892  if (queries_interrupt_flag_[query_session]) {
3894  }
3895 }
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1068
static QuerySessionMap queries_session_map_
Definition: Execute.h:1076
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1116
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:1074
#define VLOG(n)
Definition: Logger.h:297
void Executor::clearMemory ( const Data_Namespace::MemoryLevel  memory_level)
static

Definition at line 185 of file Execute.cpp.

References Data_Namespace::DataMgr::clearMemory(), Data_Namespace::CPU_LEVEL, Catalog_Namespace::SysCatalog::getDataMgr(), Data_Namespace::GPU_LEVEL, Catalog_Namespace::SysCatalog::instance(), and CacheInvalidator< CACHE_HOLDING_TYPES >::invalidateCaches().

Referenced by DBHandler::clear_cpu_memory(), DBHandler::clear_gpu_memory(), QueryRunner::QueryRunner::clearCpuMemory(), and QueryRunner::QueryRunner::clearGpuMemory().

185  {
186  switch (memory_level) {
189  mapd_unique_lock<mapd_shared_mutex> flush_lock(
190  execute_mutex_); // Don't flush memory while queries are running
191 
193  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
194  // The hash table cache uses CPU memory not managed by the buffer manager. In the
195  // future, we should manage these allocations with the buffer manager directly.
196  // For now, assume the user wants to purge the hash table cache when they clear
197  // CPU memory (currently used in ExecuteTest to lower memory pressure)
199  }
200  break;
201  }
202  default: {
203  throw std::runtime_error(
204  "Clearing memory levels other than the CPU level or GPU level is not "
205  "supported.");
206  }
207  }
208 }
static mapd_shared_mutex execute_mutex_
Definition: Execute.h:1083
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:384
static void invalidateCaches()
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:193
static SysCatalog & instance()
Definition: SysCatalog.h:292

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Executor::clearMetaInfoCache ( )
private

Definition at line 377 of file Execute.cpp.

References input_table_info_cache_().

377  {
381 }
AggregatedColRange agg_col_range_cache_
Definition: Execute.h:1066
InputTableInfoCache input_table_info_cache_
Definition: Execute.h:1065
TableGenerations table_generations_
Definition: Execute.h:1067

+ Here is the call graph for this function:

void Executor::clearQuerySessionStatus ( const QuerySessionId query_session,
const std::string &  submitted_time_str,
const bool  acquire_spin_lock 
)

Definition at line 3897 of file Execute.cpp.

References executor_id_().

3899  {
3900  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3901  // clear the interrupt-related info for a finished query
3902  if (query_session.empty()) {
3903  return;
3904  }
3905  removeFromQuerySessionList(query_session, submitted_time_str, session_write_lock);
3906  if (query_session.compare(current_query_session_) == 0 &&
3908  invalidateRunningQuerySession(session_write_lock);
3909  if (acquire_spin_lock) {
3910  // try to unlock executor's internal spin lock (let say "L") iff it is acquired
3911  // otherwise we do not need to care about the "L" lock
3912  // i.e., import table does not have a code path towards Executor
3913  // so we just exploit executor's session management code and also global interrupt
3914  // flag excepting this "L" lock
3915  execute_spin_lock_.clear(std::memory_order_release);
3916  }
3917  resetInterrupt();
3918  }
3919 }
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1068
void invalidateRunningQuerySession(mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3848
static std::atomic_flag execute_spin_lock_
Definition: Execute.h:1079
const ExecutorId executor_id_
Definition: Execute.h:1052
static QuerySessionId current_query_session_
Definition: Execute.h:1070
void resetInterrupt()
bool removeFromQuerySessionList(const QuerySessionId &query_session, const std::string &submitted_time_str, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:4072
static size_t running_query_executor_id_
Definition: Execute.h:1072

+ Here is the call graph for this function:

llvm::Value * Executor::codegenAggregateWindowState ( )
private

Definition at line 326 of file WindowFunctionIR.cpp.

References AUTOMATIC_IR_METADATA, AVG, COUNT, anonymous_namespace{WindowFunctionIR.cpp}::get_adjusted_window_type_info(), get_int_type(), WindowProjectNodeContext::getActiveWindowFunctionContext(), Analyzer::WindowFunction::getKind(), kDECIMAL, kDOUBLE, and kFLOAT.

326  {
328  const auto pi32_type =
329  llvm::PointerType::get(get_int_type(32, cgen_state_->context_), 0);
330  const auto pi64_type =
331  llvm::PointerType::get(get_int_type(64, cgen_state_->context_), 0);
332  const auto window_func_context =
334  const Analyzer::WindowFunction* window_func = window_func_context->getWindowFunction();
335  const auto window_func_ti = get_adjusted_window_type_info(window_func);
336  const auto aggregate_state_type =
337  window_func_ti.get_type() == kFLOAT ? pi32_type : pi64_type;
338  auto aggregate_state = aggregateWindowStatePtr();
339  if (window_func->getKind() == SqlWindowFunctionKind::AVG) {
340  const auto aggregate_state_count_i64 = cgen_state_->llInt(
341  reinterpret_cast<const int64_t>(window_func_context->aggregateStateCount()));
342  auto aggregate_state_count = cgen_state_->ir_builder_.CreateIntToPtr(
343  aggregate_state_count_i64, aggregate_state_type);
344  const auto double_null_lv = cgen_state_->inlineFpNull(SQLTypeInfo(kDOUBLE));
345  switch (window_func_ti.get_type()) {
346  case kFLOAT: {
347  return cgen_state_->emitCall(
348  "load_avg_float", {aggregate_state, aggregate_state_count, double_null_lv});
349  }
350  case kDOUBLE: {
351  return cgen_state_->emitCall(
352  "load_avg_double", {aggregate_state, aggregate_state_count, double_null_lv});
353  }
354  case kDECIMAL: {
355  return cgen_state_->emitCall(
356  "load_avg_decimal",
357  {aggregate_state,
358  aggregate_state_count,
359  double_null_lv,
360  cgen_state_->llInt<int32_t>(window_func_ti.get_scale())});
361  }
362  default: {
363  return cgen_state_->emitCall(
364  "load_avg_int", {aggregate_state, aggregate_state_count, double_null_lv});
365  }
366  }
367  }
368  if (window_func->getKind() == SqlWindowFunctionKind::COUNT) {
369  return cgen_state_->ir_builder_.CreateLoad(aggregate_state);
370  }
371  switch (window_func_ti.get_type()) {
372  case kFLOAT: {
373  return cgen_state_->emitCall("load_float", {aggregate_state});
374  }
375  case kDOUBLE: {
376  return cgen_state_->emitCall("load_double", {aggregate_state});
377  }
378  default: {
379  return cgen_state_->ir_builder_.CreateLoad(aggregate_state);
380  }
381  }
382 }
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:1447
llvm::Value * aggregateWindowStatePtr()
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
SQLTypeInfo get_adjusted_window_type_info(const Analyzer::WindowFunction *window_func)

+ Here is the call graph for this function:

void Executor::codegenJoinLoops ( const std::vector< JoinLoop > &  join_loops,
const RelAlgExecutionUnit ra_exe_unit,
GroupByAndAggregate group_by_and_aggregate,
llvm::Function *  query_func,
llvm::BasicBlock *  entry_bb,
const QueryMemoryDescriptor query_mem_desc,
const CompilationOptions co,
const ExecutionOptions eo 
)
private

Definition at line 824 of file IRCodegen.cpp.

References ExecutionOptions::allow_runtime_query_interrupt, AUTOMATIC_IR_METADATA, CodeGenerator::cgen_state_, JoinLoop::codegen(), CgenState::context_, CgenState::current_func_, CompilationOptions::device_type, CgenState::ir_builder_, CgenState::llInt(), CgenState::needs_error_check_, CodeGenerator::posArg(), GroupByAndAggregate::query_infos_, and ExecutionOptions::with_dynamic_watchdog.

831  {
833  const auto exit_bb =
834  llvm::BasicBlock::Create(cgen_state_->context_, "exit", cgen_state_->current_func_);
835  cgen_state_->ir_builder_.SetInsertPoint(exit_bb);
836  cgen_state_->ir_builder_.CreateRet(cgen_state_->llInt<int32_t>(0));
837  cgen_state_->ir_builder_.SetInsertPoint(entry_bb);
838  CodeGenerator code_generator(this);
839  const auto loops_entry_bb = JoinLoop::codegen(
840  join_loops,
841  /*body_codegen=*/
842  [this,
843  query_func,
844  &query_mem_desc,
845  &co,
846  &eo,
847  &group_by_and_aggregate,
848  &join_loops,
849  &ra_exe_unit](const std::vector<llvm::Value*>& prev_iters) {
851  addJoinLoopIterator(prev_iters, join_loops.size());
852  auto& builder = cgen_state_->ir_builder_;
853  const auto loop_body_bb = llvm::BasicBlock::Create(
854  builder.getContext(), "loop_body", builder.GetInsertBlock()->getParent());
855  builder.SetInsertPoint(loop_body_bb);
856  const bool can_return_error =
857  compileBody(ra_exe_unit, group_by_and_aggregate, query_mem_desc, co);
858  if (can_return_error || cgen_state_->needs_error_check_ ||
860  createErrorCheckControlFlow(query_func,
863  co.device_type,
864  group_by_and_aggregate.query_infos_);
865  }
866  return loop_body_bb;
867  },
868  /*outer_iter=*/code_generator.posArg(nullptr),
869  exit_bb,
870  cgen_state_.get());
871  cgen_state_->ir_builder_.SetInsertPoint(entry_bb);
872  cgen_state_->ir_builder_.CreateBr(loops_entry_bb);
873 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
bool compileBody(const RelAlgExecutionUnit &ra_exe_unit, GroupByAndAggregate &group_by_and_aggregate, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const GpuSharedMemoryContext &gpu_smem_context={})
static llvm::BasicBlock * codegen(const std::vector< JoinLoop > &join_loops, const std::function< llvm::BasicBlock *(const std::vector< llvm::Value * > &)> &body_codegen, llvm::Value *outer_iter, llvm::BasicBlock *exit_bb, CgenState *cgen_state)
Definition: JoinLoop.cpp:48
#define AUTOMATIC_IR_METADATA(CGENSTATE)
ExecutorDeviceType device_type
const std::vector< InputTableInfo > & query_infos_
llvm::Value * addJoinLoopIterator(const std::vector< llvm::Value * > &prev_iters, const size_t level_idx)
Definition: IRCodegen.cpp:806
void createErrorCheckControlFlow(llvm::Function *query_func, bool run_with_dynamic_watchdog, bool run_with_allowing_runtime_interrupt, ExecutorDeviceType device_type, const std::vector< InputTableInfo > &input_table_infos)

+ Here is the call graph for this function:

llvm::BasicBlock * Executor::codegenSkipDeletedOuterTableRow ( const RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co 
)
private

Definition at line 2951 of file NativeCodegen.cpp.

2953  {
2955  if (!co.filter_on_deleted_column) {
2956  return nullptr;
2957  }
2958  CHECK(!ra_exe_unit.input_descs.empty());
2959  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
2960  if (outer_input_desc.getSourceType() != InputSourceType::TABLE) {
2961  return nullptr;
2962  }
2963  const auto deleted_cd =
2964  plan_state_->getDeletedColForTable(outer_input_desc.getTableId());
2965  if (!deleted_cd) {
2966  return nullptr;
2967  }
2968  CHECK(deleted_cd->columnType.is_boolean());
2969  const auto deleted_expr =
2970  makeExpr<Analyzer::ColumnVar>(deleted_cd->columnType,
2971  outer_input_desc.getTableId(),
2972  deleted_cd->columnId,
2973  outer_input_desc.getNestLevel());
2974  CodeGenerator code_generator(this);
2975  const auto is_deleted =
2976  code_generator.toBool(code_generator.codegen(deleted_expr.get(), true, co).front());
2977  const auto is_deleted_bb = llvm::BasicBlock::Create(
2978  cgen_state_->context_, "is_deleted", cgen_state_->row_func_);
2979  llvm::BasicBlock* bb = llvm::BasicBlock::Create(
2980  cgen_state_->context_, "is_not_deleted", cgen_state_->row_func_);
2981  cgen_state_->ir_builder_.CreateCondBr(is_deleted, is_deleted_bb, bb);
2982  cgen_state_->ir_builder_.SetInsertPoint(is_deleted_bb);
2983  cgen_state_->ir_builder_.CreateRet(cgen_state_->llInt<int32_t>(0));
2984  cgen_state_->ir_builder_.SetInsertPoint(bb);
2985  return bb;
2986 }
std::vector< InputDescriptor > input_descs
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1024
#define AUTOMATIC_IR_METADATA(CGENSTATE)
#define CHECK(condition)
Definition: Logger.h:203
void Executor::codegenWindowAvgEpilogue ( llvm::Value *  crt_val,
llvm::Value *  window_func_null_val,
llvm::Value *  multiplicity_lv 
)
private

Definition at line 289 of file WindowFunctionIR.cpp.

References AUTOMATIC_IR_METADATA, anonymous_namespace{WindowFunctionIR.cpp}::get_adjusted_window_type_info(), get_int_type(), WindowProjectNodeContext::getActiveWindowFunctionContext(), kDOUBLE, and kFLOAT.

291  {
293  const auto window_func_context =
295  const auto window_func = window_func_context->getWindowFunction();
296  const auto window_func_ti = get_adjusted_window_type_info(window_func);
297  const auto pi32_type =
298  llvm::PointerType::get(get_int_type(32, cgen_state_->context_), 0);
299  const auto pi64_type =
300  llvm::PointerType::get(get_int_type(64, cgen_state_->context_), 0);
301  const auto aggregate_state_type =
302  window_func_ti.get_type() == kFLOAT ? pi32_type : pi64_type;
303  const auto aggregate_state_count_i64 = cgen_state_->llInt(
304  reinterpret_cast<const int64_t>(window_func_context->aggregateStateCount()));
305  auto aggregate_state_count = cgen_state_->ir_builder_.CreateIntToPtr(
306  aggregate_state_count_i64, aggregate_state_type);
307  std::string agg_count_func_name = "agg_count";
308  switch (window_func_ti.get_type()) {
309  case kFLOAT: {
310  agg_count_func_name += "_float";
311  break;
312  }
313  case kDOUBLE: {
314  agg_count_func_name += "_double";
315  break;
316  }
317  default: {
318  break;
319  }
320  }
321  agg_count_func_name += "_skip_val";
322  cgen_state_->emitCall(agg_count_func_name,
323  {aggregate_state_count, crt_val, window_func_null_val});
324 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
SQLTypeInfo get_adjusted_window_type_info(const Analyzer::WindowFunction *window_func)

+ Here is the call graph for this function:

llvm::Value * Executor::codegenWindowFunction ( const size_t  target_index,
const CompilationOptions co 
)
private

Definition at line 21 of file WindowFunctionIR.cpp.

References WindowProjectNodeContext::activateWindowFunctionContext(), run_benchmark_import::args, AUTOMATIC_IR_METADATA, AVG, CHECK, CHECK_EQ, COUNT, CUME_DIST, DENSE_RANK, logger::FATAL, FIRST_VALUE, WindowProjectNodeContext::get(), WindowFunctionContext::getWindowFunction(), LAG, LAST_VALUE, LEAD, LOG, MAX, MIN, NTILE, PERCENT_RANK, RANK, ROW_NUMBER, and SUM.

22  {
24  CodeGenerator code_generator(this);
25  const auto window_func_context =
27  target_index);
28  const auto window_func = window_func_context->getWindowFunction();
29  switch (window_func->getKind()) {
34  return cgen_state_->emitCall("row_number_window_func",
35  {cgen_state_->llInt(reinterpret_cast<const int64_t>(
36  window_func_context->output())),
37  code_generator.posArg(nullptr)});
38  }
41  return cgen_state_->emitCall("percent_window_func",
42  {cgen_state_->llInt(reinterpret_cast<const int64_t>(
43  window_func_context->output())),
44  code_generator.posArg(nullptr)});
45  }
51  const auto& args = window_func->getArgs();
52  CHECK(!args.empty());
53  const auto arg_lvs = code_generator.codegen(args.front().get(), true, co);
54  CHECK_EQ(arg_lvs.size(), size_t(1));
55  return arg_lvs.front();
56  }
63  }
64  default: {
65  LOG(FATAL) << "Invalid window function kind";
66  }
67  }
68  return nullptr;
69 }
#define CHECK_EQ(x, y)
Definition: Logger.h:211
#define LOG(tag)
Definition: Logger.h:194
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
static const WindowProjectNodeContext * get(Executor *executor)
const WindowFunctionContext * activateWindowFunctionContext(Executor *executor, const size_t target_index) const
#define AUTOMATIC_IR_METADATA(CGENSTATE)
llvm::Value * codegenWindowFunctionAggregate(const CompilationOptions &co)
#define CHECK(condition)
Definition: Logger.h:203
const Analyzer::WindowFunction * getWindowFunction() const

+ Here is the call graph for this function:

llvm::Value * Executor::codegenWindowFunctionAggregate ( const CompilationOptions co)
private

Definition at line 140 of file WindowFunctionIR.cpp.

References AUTOMATIC_IR_METADATA, AVG, CHECK, WindowProjectNodeContext::get(), get_int_type(), and WindowProjectNodeContext::getActiveWindowFunctionContext().

140  {
142  const auto reset_state_false_bb = codegenWindowResetStateControlFlow();
143  auto aggregate_state = aggregateWindowStatePtr();
144  llvm::Value* aggregate_state_count = nullptr;
145  const auto window_func_context =
147  const auto window_func = window_func_context->getWindowFunction();
148  if (window_func->getKind() == SqlWindowFunctionKind::AVG) {
149  const auto aggregate_state_count_i64 = cgen_state_->llInt(
150  reinterpret_cast<const int64_t>(window_func_context->aggregateStateCount()));
151  const auto pi64_type =
152  llvm::PointerType::get(get_int_type(64, cgen_state_->context_), 0);
153  aggregate_state_count =
154  cgen_state_->ir_builder_.CreateIntToPtr(aggregate_state_count_i64, pi64_type);
155  }
156  codegenWindowFunctionStateInit(aggregate_state);
157  if (window_func->getKind() == SqlWindowFunctionKind::AVG) {
158  const auto count_zero = cgen_state_->llInt(int64_t(0));
159  cgen_state_->emitCall("agg_id", {aggregate_state_count, count_zero});
160  }
161  cgen_state_->ir_builder_.CreateBr(reset_state_false_bb);
162  cgen_state_->ir_builder_.SetInsertPoint(reset_state_false_bb);
164  return codegenWindowFunctionAggregateCalls(aggregate_state, co);
165 }
llvm::Value * aggregateWindowStatePtr()
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
static const WindowProjectNodeContext * get(Executor *executor)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
void codegenWindowFunctionStateInit(llvm::Value *aggregate_state)
#define CHECK(condition)
Definition: Logger.h:203
llvm::Value * codegenWindowFunctionAggregateCalls(llvm::Value *aggregate_state, const CompilationOptions &co)
llvm::BasicBlock * codegenWindowResetStateControlFlow()

+ Here is the call graph for this function:

llvm::Value * Executor::codegenWindowFunctionAggregateCalls ( llvm::Value *  aggregate_state,
const CompilationOptions co 
)
private

Definition at line 246 of file WindowFunctionIR.cpp.

References run_benchmark_import::args, AUTOMATIC_IR_METADATA, AVG, CHECK, CHECK_EQ, CodeGenerator::codegen(), CodeGenerator::codegenCastBetweenIntTypes(), COUNT, anonymous_namespace{WindowFunctionIR.cpp}::get_adjusted_window_type_info(), anonymous_namespace{WindowFunctionIR.cpp}::get_window_agg_name(), WindowProjectNodeContext::getActiveWindowFunctionContext(), kFLOAT, and SUM.

247  {
249  const auto window_func_context =
251  const auto window_func = window_func_context->getWindowFunction();
252  const auto window_func_ti = get_adjusted_window_type_info(window_func);
253  const auto window_func_null_val =
254  window_func_ti.is_fp()
255  ? cgen_state_->inlineFpNull(window_func_ti)
256  : cgen_state_->castToTypeIn(cgen_state_->inlineIntNull(window_func_ti), 64);
257  const auto& args = window_func->getArgs();
258  llvm::Value* crt_val;
259  if (args.empty()) {
260  CHECK(window_func->getKind() == SqlWindowFunctionKind::COUNT);
261  crt_val = cgen_state_->llInt(int64_t(1));
262  } else {
263  CodeGenerator code_generator(this);
264  const auto arg_lvs = code_generator.codegen(args.front().get(), true, co);
265  CHECK_EQ(arg_lvs.size(), size_t(1));
266  if (window_func->getKind() == SqlWindowFunctionKind::SUM && !window_func_ti.is_fp()) {
267  crt_val = code_generator.codegenCastBetweenIntTypes(
268  arg_lvs.front(), args.front()->get_type_info(), window_func_ti, false);
269  } else {
270  crt_val = window_func_ti.get_type() == kFLOAT
271  ? arg_lvs.front()
272  : cgen_state_->castToTypeIn(arg_lvs.front(), 64);
273  }
274  }
275  const auto agg_name = get_window_agg_name(window_func->getKind(), window_func_ti);
276  llvm::Value* multiplicity_lv = nullptr;
277  if (args.empty()) {
278  cgen_state_->emitCall(agg_name, {aggregate_state, crt_val});
279  } else {
280  cgen_state_->emitCall(agg_name + "_skip_val",
281  {aggregate_state, crt_val, window_func_null_val});
282  }
283  if (window_func->getKind() == SqlWindowFunctionKind::AVG) {
284  codegenWindowAvgEpilogue(crt_val, window_func_null_val, multiplicity_lv);
285  }
287 }
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
std::string get_window_agg_name(const SqlWindowFunctionKind kind, const SQLTypeInfo &window_func_ti)
void codegenWindowAvgEpilogue(llvm::Value *crt_val, llvm::Value *window_func_null_val, llvm::Value *multiplicity_lv)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
llvm::Value * codegenAggregateWindowState()
#define CHECK(condition)
Definition: Logger.h:203
SQLTypeInfo get_adjusted_window_type_info(const Analyzer::WindowFunction *window_func)

+ Here is the call graph for this function:

void Executor::codegenWindowFunctionStateInit ( llvm::Value *  aggregate_state)
private

Definition at line 196 of file WindowFunctionIR.cpp.

References AUTOMATIC_IR_METADATA, COUNT, anonymous_namespace{WindowFunctionIR.cpp}::get_adjusted_window_type_info(), get_int_type(), WindowProjectNodeContext::getActiveWindowFunctionContext(), kDOUBLE, and kFLOAT.

196  {
198  const auto window_func_context =
200  const auto window_func = window_func_context->getWindowFunction();
201  const auto window_func_ti = get_adjusted_window_type_info(window_func);
202  const auto window_func_null_val =
203  window_func_ti.is_fp()
204  ? cgen_state_->inlineFpNull(window_func_ti)
205  : cgen_state_->castToTypeIn(cgen_state_->inlineIntNull(window_func_ti), 64);
206  llvm::Value* window_func_init_val;
207  if (window_func_context->getWindowFunction()->getKind() ==
209  switch (window_func_ti.get_type()) {
210  case kFLOAT: {
211  window_func_init_val = cgen_state_->llFp(float(0));
212  break;
213  }
214  case kDOUBLE: {
215  window_func_init_val = cgen_state_->llFp(double(0));
216  break;
217  }
218  default: {
219  window_func_init_val = cgen_state_->llInt(int64_t(0));
220  break;
221  }
222  }
223  } else {
224  window_func_init_val = window_func_null_val;
225  }
226  const auto pi32_type =
227  llvm::PointerType::get(get_int_type(32, cgen_state_->context_), 0);
228  switch (window_func_ti.get_type()) {
229  case kDOUBLE: {
230  cgen_state_->emitCall("agg_id_double", {aggregate_state, window_func_init_val});
231  break;
232  }
233  case kFLOAT: {
234  aggregate_state =
235  cgen_state_->ir_builder_.CreateBitCast(aggregate_state, pi32_type);
236  cgen_state_->emitCall("agg_id_float", {aggregate_state, window_func_init_val});
237  break;
238  }
239  default: {
240  cgen_state_->emitCall("agg_id", {aggregate_state, window_func_init_val});
241  break;
242  }
243  }
244 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
SQLTypeInfo get_adjusted_window_type_info(const Analyzer::WindowFunction *window_func)

+ Here is the call graph for this function:

llvm::BasicBlock * Executor::codegenWindowResetStateControlFlow ( )
private

Definition at line 167 of file WindowFunctionIR.cpp.

References AUTOMATIC_IR_METADATA, WindowProjectNodeContext::getActiveWindowFunctionContext(), CodeGenerator::posArg(), and CodeGenerator::toBool().

167  {
169  const auto window_func_context =
171  const auto bitset = cgen_state_->llInt(
172  reinterpret_cast<const int64_t>(window_func_context->partitionStart()));
173  const auto min_val = cgen_state_->llInt(int64_t(0));
174  const auto max_val = cgen_state_->llInt(window_func_context->elementCount() - 1);
175  const auto null_val = cgen_state_->llInt(inline_int_null_value<int64_t>());
176  const auto null_bool_val = cgen_state_->llInt<int8_t>(inline_int_null_value<int8_t>());
177  CodeGenerator code_generator(this);
178  const auto reset_state =
179  code_generator.toBool(cgen_state_->emitCall("bit_is_set",
180  {bitset,
181  code_generator.posArg(nullptr),
182  min_val,
183  max_val,
184  null_val,
185  null_bool_val}));
186  const auto reset_state_true_bb = llvm::BasicBlock::Create(
187  cgen_state_->context_, "reset_state.true", cgen_state_->current_func_);
188  const auto reset_state_false_bb = llvm::BasicBlock::Create(
189  cgen_state_->context_, "reset_state.false", cgen_state_->current_func_);
190  cgen_state_->ir_builder_.CreateCondBr(
191  reset_state, reset_state_true_bb, reset_state_false_bb);
192  cgen_state_->ir_builder_.SetInsertPoint(reset_state_true_bb);
193  return reset_state_false_bb;
194 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
#define AUTOMATIC_IR_METADATA(CGENSTATE)

+ Here is the call graph for this function:

ResultSetPtr Executor::collectAllDeviceResults ( SharedKernelContext shared_context,
const RelAlgExecutionUnit ra_exe_unit,
const QueryMemoryDescriptor query_mem_desc,
const ExecutorDeviceType  device_type,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner 
)
private

Definition at line 1855 of file Execute.cpp.

References anonymous_namespace{Execute.cpp}::build_row_for_empty_input(), catalog_(), DEBUG_TIMER, SharedKernelContext::getFragmentResults(), QueryMemoryDescriptor::getQueryDescriptionType(), GPU, NonGroupedAggregate, GroupByAndAggregate::shard_count_for_top_groups(), RelAlgExecutionUnit::target_exprs, and use_speculative_top_n().

1860  {
1861  auto timer = DEBUG_TIMER(__func__);
1862  auto& result_per_device = shared_context.getFragmentResults();
1863  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
1866  ra_exe_unit.target_exprs, query_mem_desc, device_type);
1867  }
1868  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
1869  try {
1870  return reduceSpeculativeTopN(
1871  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1872  } catch (const std::bad_alloc&) {
1873  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
1874  }
1875  }
1876  const auto shard_count =
1877  device_type == ExecutorDeviceType::GPU
1879  : 0;
1880 
1881  if (shard_count && !result_per_device.empty()) {
1882  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit);
1883  }
1884  return reduceMultiDeviceResults(
1885  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1886 }
std::vector< Analyzer::Expr * > target_exprs
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:1007
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1053
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:900
ResultSetPtr collectAllDeviceShardedTopResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit) const
Definition: Execute.cpp:1970
QueryDescriptionType getQueryDescriptionType() const
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr * > &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1813
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
#define DEBUG_TIMER(name)
Definition: Logger.h:319
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)

+ Here is the call graph for this function:

ResultSetPtr Executor::collectAllDeviceShardedTopResults ( SharedKernelContext shared_context,
const RelAlgExecutionUnit ra_exe_unit 
) const
private

Definition at line 1970 of file Execute.cpp.

References catalog_(), CHECK, CHECK_EQ, CHECK_LE, SharedKernelContext::getFragmentResults(), SortInfo::limit, SortInfo::offset, SortInfo::order_entries, anonymous_namespace{Execute.cpp}::permute_storage_columnar(), anonymous_namespace{Execute.cpp}::permute_storage_row_wise(), run_benchmark_import::result, and RelAlgExecutionUnit::sort_info.

1972  {
1973  auto& result_per_device = shared_context.getFragmentResults();
1974  const auto first_result_set = result_per_device.front().first;
1975  CHECK(first_result_set);
1976  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
1977  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
1978  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1979  top_query_mem_desc.setEntryCount(0);
1980  for (auto& result : result_per_device) {
1981  const auto result_set = result.first;
1982  CHECK(result_set);
1983  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n, this);
1984  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
1985  top_query_mem_desc.setEntryCount(new_entry_cnt);
1986  }
1987  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
1988  first_result_set->getDeviceType(),
1989  top_query_mem_desc,
1990  first_result_set->getRowSetMemOwner(),
1991  catalog_,
1992  blockSize(),
1993  gridSize());
1994  auto top_storage = top_result_set->allocateStorage();
1995  size_t top_output_row_idx{0};
1996  for (auto& result : result_per_device) {
1997  const auto result_set = result.first;
1998  CHECK(result_set);
1999  const auto& top_permutation = result_set->getPermutationBuffer();
2000  CHECK_LE(top_permutation.size(), top_n);
2001  if (top_query_mem_desc.didOutputColumnar()) {
2002  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
2003  result_set->getQueryMemDesc(),
2004  top_storage,
2005  top_output_row_idx,
2006  top_query_mem_desc,
2007  top_permutation);
2008  } else {
2009  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
2010  top_storage,
2011  top_output_row_idx,
2012  top_query_mem_desc,
2013  top_permutation);
2014  }
2015  }
2016  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2017  return top_result_set;
2018 }
#define CHECK_EQ(x, y)
Definition: Logger.h:211
const std::list< Analyzer::OrderEntry > order_entries
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1949
const size_t limit
const SortInfo sort_info
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1053
#define CHECK_LE(x, y)
Definition: Logger.h:214
unsigned gridSize() const
Definition: Execute.cpp:3324
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1899
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
#define CHECK(condition)
Definition: Logger.h:203
unsigned blockSize() const
Definition: Execute.cpp:3341
const size_t offset

+ Here is the call graph for this function:

bool Executor::compileBody ( const RelAlgExecutionUnit ra_exe_unit,
GroupByAndAggregate group_by_and_aggregate,
const QueryMemoryDescriptor query_mem_desc,
const CompilationOptions co,
const GpuSharedMemoryContext gpu_smem_context = {} 
)
private

Definition at line 2988 of file NativeCodegen.cpp.

2992  {
2994 
2995  // Switch the code generation into a separate filter function if enabled.
2996  // Note that accesses to function arguments are still codegenned from the
2997  // row function's arguments, then later automatically forwarded and
2998  // remapped into filter function arguments by redeclareFilterFunction().
2999  cgen_state_->row_func_bb_ = cgen_state_->ir_builder_.GetInsertBlock();
3000  llvm::Value* loop_done{nullptr};
3001  std::unique_ptr<Executor::FetchCacheAnchor> fetch_cache_anchor;
3002  if (cgen_state_->filter_func_) {
3003  if (cgen_state_->row_func_bb_->getName() == "loop_body") {
3004  auto row_func_entry_bb = &cgen_state_->row_func_->getEntryBlock();
3005  cgen_state_->ir_builder_.SetInsertPoint(row_func_entry_bb,
3006  row_func_entry_bb->begin());
3007  loop_done = cgen_state_->ir_builder_.CreateAlloca(
3008  get_int_type(1, cgen_state_->context_), nullptr, "loop_done");
3009  cgen_state_->ir_builder_.SetInsertPoint(cgen_state_->row_func_bb_);
3010  cgen_state_->ir_builder_.CreateStore(cgen_state_->llBool(true), loop_done);
3011  }
3012  cgen_state_->ir_builder_.SetInsertPoint(cgen_state_->filter_func_bb_);
3013  cgen_state_->current_func_ = cgen_state_->filter_func_;
3014  fetch_cache_anchor = std::make_unique<Executor::FetchCacheAnchor>(cgen_state_.get());
3015  }
3016 
3017  // generate the code for the filter
3018  std::vector<Analyzer::Expr*> primary_quals;
3019  std::vector<Analyzer::Expr*> deferred_quals;
3020  bool short_circuited = CodeGenerator::prioritizeQuals(
3021  ra_exe_unit, primary_quals, deferred_quals, plan_state_->hoisted_filters_);
3022  if (short_circuited) {
3023  VLOG(1) << "Prioritized " << std::to_string(primary_quals.size()) << " quals, "
3024  << "short-circuited and deferred " << std::to_string(deferred_quals.size())
3025  << " quals";
3026  }
3027  llvm::Value* filter_lv = cgen_state_->llBool(true);
3028  CodeGenerator code_generator(this);
3029  for (auto expr : primary_quals) {
3030  // Generate the filter for primary quals
3031  auto cond = code_generator.toBool(code_generator.codegen(expr, true, co).front());
3032  filter_lv = cgen_state_->ir_builder_.CreateAnd(filter_lv, cond);
3033  }
3034  CHECK(filter_lv->getType()->isIntegerTy(1));
3035  llvm::BasicBlock* sc_false{nullptr};
3036  if (!deferred_quals.empty()) {
3037  auto sc_true = llvm::BasicBlock::Create(
3038  cgen_state_->context_, "sc_true", cgen_state_->current_func_);
3039  sc_false = llvm::BasicBlock::Create(
3040  cgen_state_->context_, "sc_false", cgen_state_->current_func_);
3041  cgen_state_->ir_builder_.CreateCondBr(filter_lv, sc_true, sc_false);
3042  cgen_state_->ir_builder_.SetInsertPoint(sc_false);
3043  if (ra_exe_unit.join_quals.empty()) {
3044  cgen_state_->ir_builder_.CreateRet(cgen_state_->llInt(int32_t(0)));
3045  }
3046  cgen_state_->ir_builder_.SetInsertPoint(sc_true);
3047  filter_lv = cgen_state_->llBool(true);
3048  }
3049  for (auto expr : deferred_quals) {
3050  filter_lv = cgen_state_->ir_builder_.CreateAnd(
3051  filter_lv, code_generator.toBool(code_generator.codegen(expr, true, co).front()));
3052  }
3053 
3054  CHECK(filter_lv->getType()->isIntegerTy(1));
3055  auto ret = group_by_and_aggregate.codegen(
3056  filter_lv, sc_false, query_mem_desc, co, gpu_smem_context);
3057 
3058  // Switch the code generation back to the row function if a filter
3059  // function was enabled.
3060  if (cgen_state_->filter_func_) {
3061  if (cgen_state_->row_func_bb_->getName() == "loop_body") {
3062  cgen_state_->ir_builder_.CreateStore(cgen_state_->llBool(false), loop_done);
3063  cgen_state_->ir_builder_.CreateRet(cgen_state_->llInt<int32_t>(0));
3064  }
3065 
3066  cgen_state_->ir_builder_.SetInsertPoint(cgen_state_->row_func_bb_);
3067  cgen_state_->current_func_ = cgen_state_->row_func_;
3068  cgen_state_->filter_func_call_ =
3069  cgen_state_->ir_builder_.CreateCall(cgen_state_->filter_func_, {});
3070 
3071  // Create real filter function declaration after placeholder call
3072  // is emitted.
3074 
3075  if (cgen_state_->row_func_bb_->getName() == "loop_body") {
3076  auto loop_done_true = llvm::BasicBlock::Create(
3077  cgen_state_->context_, "loop_done_true", cgen_state_->row_func_);
3078  auto loop_done_false = llvm::BasicBlock::Create(
3079  cgen_state_->context_, "loop_done_false", cgen_state_->row_func_);
3080  auto loop_done_flag = cgen_state_->ir_builder_.CreateLoad(loop_done);
3081  cgen_state_->ir_builder_.CreateCondBr(
3082  loop_done_flag, loop_done_true, loop_done_false);
3083  cgen_state_->ir_builder_.SetInsertPoint(loop_done_true);
3084  cgen_state_->ir_builder_.CreateRet(cgen_state_->filter_func_call_);
3085  cgen_state_->ir_builder_.SetInsertPoint(loop_done_false);
3086  } else {
3087  cgen_state_->ir_builder_.CreateRet(cgen_state_->filter_func_call_);
3088  }
3089  }
3090  return ret;
3091 }
bool codegen(llvm::Value *filter_result, llvm::BasicBlock *sc_false, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const GpuSharedMemoryContext &gpu_smem_context)
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
std::string to_string(char const *&&v)
const JoinQualsPerNestingLevel join_quals
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1024
#define AUTOMATIC_IR_METADATA(CGENSTATE)
static bool prioritizeQuals(const RelAlgExecutionUnit &ra_exe_unit, std::vector< Analyzer::Expr * > &primary_quals, std::vector< Analyzer::Expr * > &deferred_quals, const PlanState::HoistedFiltersSet &hoisted_quals)
Definition: LogicalIR.cpp:157
#define CHECK(condition)
Definition: Logger.h:203
void redeclareFilterFunction()
Definition: IRCodegen.cpp:708
#define VLOG(n)
Definition: Logger.h:297
std::tuple< CompilationResult, std::unique_ptr< QueryMemoryDescriptor > > Executor::compileWorkUnit ( const std::vector< InputTableInfo > &  query_infos,
const PlanState::DeletedColumnsMap deleted_cols_map,
const RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co,
const ExecutionOptions eo,
const CudaMgr_Namespace::CudaMgr cuda_mgr,
const bool  allow_lazy_fetch,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const size_t  max_groups_buffer_entry_count,
const int8_t  crt_min_byte_width,
const bool  has_cardinality_estimation,
ColumnCacheMap column_cache,
RenderInfo render_info = nullptr 
)
private

Definition at line 2479 of file NativeCodegen.cpp.

2491  {
2492  auto timer = DEBUG_TIMER(__func__);
2493 
2495  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
2496  if (!cuda_mgr) {
2497  throw QueryMustRunOnCpu();
2498  }
2499  }
2500 
2501 #ifndef NDEBUG
2502  static std::uint64_t counter = 0;
2503  ++counter;
2504  VLOG(1) << "CODEGEN #" << counter << ":";
2505  LOG(IR) << "CODEGEN #" << counter << ":";
2506  LOG(PTX) << "CODEGEN #" << counter << ":";
2507  LOG(ASM) << "CODEGEN #" << counter << ":";
2508 #endif
2509 
2510  nukeOldState(allow_lazy_fetch, query_infos, deleted_cols_map, &ra_exe_unit);
2511 
2512  GroupByAndAggregate group_by_and_aggregate(
2513  this,
2514  co.device_type,
2515  ra_exe_unit,
2516  query_infos,
2517  row_set_mem_owner,
2518  has_cardinality_estimation ? std::optional<int64_t>(max_groups_buffer_entry_guess)
2519  : std::nullopt);
2520  auto query_mem_desc =
2521  group_by_and_aggregate.initQueryMemoryDescriptor(eo.allow_multifrag,
2522  max_groups_buffer_entry_guess,
2523  crt_min_byte_width,
2524  render_info,
2526 
2527  if (query_mem_desc->getQueryDescriptionType() ==
2529  !has_cardinality_estimation &&
2530  (!render_info || !render_info->isPotentialInSituRender()) && !eo.just_explain) {
2531  const auto col_range_info = group_by_and_aggregate.getColRangeInfo();
2532  throw CardinalityEstimationRequired(col_range_info.max - col_range_info.min);
2533  }
2534 
2535  const bool output_columnar = query_mem_desc->didOutputColumnar();
2536  const bool gpu_shared_mem_optimization =
2538  ra_exe_unit,
2539  cuda_mgr,
2540  co.device_type,
2541  cuda_mgr ? this->blockSize() : 1,
2542  cuda_mgr ? this->numBlocksPerMP() : 1);
2543  if (gpu_shared_mem_optimization) {
2544  // disable interleaved bins optimization on the GPU
2545  query_mem_desc->setHasInterleavedBinsOnGpu(false);
2546  LOG(DEBUG1) << "GPU shared memory is used for the " +
2547  query_mem_desc->queryDescTypeToString() + " query(" +
2548  std::to_string(get_shared_memory_size(gpu_shared_mem_optimization,
2549  query_mem_desc.get())) +
2550  " out of " + std::to_string(g_gpu_smem_threshold) + " bytes).";
2551  }
2552 
2553  const GpuSharedMemoryContext gpu_smem_context(
2554  get_shared_memory_size(gpu_shared_mem_optimization, query_mem_desc.get()));
2555 
2557  const size_t num_count_distinct_descs =
2558  query_mem_desc->getCountDistinctDescriptorsSize();
2559  for (size_t i = 0; i < num_count_distinct_descs; i++) {
2560  const auto& count_distinct_descriptor =
2561  query_mem_desc->getCountDistinctDescriptor(i);
2562  if (count_distinct_descriptor.impl_type_ == CountDistinctImplType::StdSet ||
2563  (count_distinct_descriptor.impl_type_ != CountDistinctImplType::Invalid &&
2564  !co.hoist_literals)) {
2565  throw QueryMustRunOnCpu();
2566  }
2567  }
2568  }
2569 
2570  // Read the module template and target either CPU or GPU
2571  // by binding the stream position functions to the right implementation:
2572  // stride access for GPU, contiguous for CPU
2573  auto rt_module_copy = llvm::CloneModule(
2574  *g_rt_module.get(), cgen_state_->vmap_, [](const llvm::GlobalValue* gv) {
2575  auto func = llvm::dyn_cast<llvm::Function>(gv);
2576  if (!func) {
2577  return true;
2578  }
2579  return (func->getLinkage() == llvm::GlobalValue::LinkageTypes::PrivateLinkage ||
2580  func->getLinkage() == llvm::GlobalValue::LinkageTypes::InternalLinkage ||
2582  });
2584  if (is_udf_module_present(true)) {
2586  }
2587  if (is_rt_udf_module_present(true)) {
2589  rt_udf_cpu_module, *rt_module_copy, cgen_state_.get());
2590  }
2591  } else {
2592  rt_module_copy->setDataLayout(get_gpu_data_layout());
2593  rt_module_copy->setTargetTriple(get_gpu_target_triple_string());
2594  if (is_udf_module_present()) {
2596  }
2597  if (is_rt_udf_module_present()) {
2599  rt_udf_gpu_module, *rt_module_copy, cgen_state_.get());
2600  }
2601  }
2602 
2603  cgen_state_->module_ = rt_module_copy.release();
2605 
2606  auto agg_fnames =
2607  get_agg_fnames(ra_exe_unit.target_exprs, !ra_exe_unit.groupby_exprs.empty());
2608 
2609  const auto agg_slot_count = ra_exe_unit.estimator ? size_t(1) : agg_fnames.size();
2610 
2611  const bool is_group_by{query_mem_desc->isGroupBy()};
2612  auto [query_func, row_func_call] = is_group_by
2614  co.hoist_literals,
2615  *query_mem_desc,
2616  co.device_type,
2617  ra_exe_unit.scan_limit,
2618  gpu_smem_context)
2619  : query_template(cgen_state_->module_,
2620  agg_slot_count,
2621  co.hoist_literals,
2622  !!ra_exe_unit.estimator,
2623  gpu_smem_context);
2624  bind_pos_placeholders("pos_start", true, query_func, cgen_state_->module_);
2625  bind_pos_placeholders("group_buff_idx", false, query_func, cgen_state_->module_);
2626  bind_pos_placeholders("pos_step", false, query_func, cgen_state_->module_);
2627 
2628  cgen_state_->query_func_ = query_func;
2629  cgen_state_->row_func_call_ = row_func_call;
2630  cgen_state_->query_func_entry_ir_builder_.SetInsertPoint(
2631  &query_func->getEntryBlock().front());
2632 
2633  // Generate the function signature and column head fetches s.t.
2634  // double indirection isn't needed in the inner loop
2635  auto& fetch_bb = query_func->front();
2636  llvm::IRBuilder<> fetch_ir_builder(&fetch_bb);
2637  fetch_ir_builder.SetInsertPoint(&*fetch_bb.begin());
2638  auto col_heads = generate_column_heads_load(ra_exe_unit.input_col_descs.size(),
2639  query_func->args().begin(),
2640  fetch_ir_builder,
2641  cgen_state_->context_);
2642  CHECK_EQ(ra_exe_unit.input_col_descs.size(), col_heads.size());
2643 
2644  cgen_state_->row_func_ = create_row_function(ra_exe_unit.input_col_descs.size(),
2645  is_group_by ? 0 : agg_slot_count,
2646  co.hoist_literals,
2647  cgen_state_->module_,
2648  cgen_state_->context_);
2649  CHECK(cgen_state_->row_func_);
2650  cgen_state_->row_func_bb_ =
2651  llvm::BasicBlock::Create(cgen_state_->context_, "entry", cgen_state_->row_func_);
2652 
2654  auto filter_func_ft =
2655  llvm::FunctionType::get(get_int_type(32, cgen_state_->context_), {}, false);
2656  cgen_state_->filter_func_ = llvm::Function::Create(filter_func_ft,
2657  llvm::Function::ExternalLinkage,
2658  "filter_func",
2659  cgen_state_->module_);
2660  CHECK(cgen_state_->filter_func_);
2661  cgen_state_->filter_func_bb_ = llvm::BasicBlock::Create(
2662  cgen_state_->context_, "entry", cgen_state_->filter_func_);
2663  }
2664 
2665  cgen_state_->current_func_ = cgen_state_->row_func_;
2666  cgen_state_->ir_builder_.SetInsertPoint(cgen_state_->row_func_bb_);
2667 
2668  preloadFragOffsets(ra_exe_unit.input_descs, query_infos);
2669  RelAlgExecutionUnit body_execution_unit = ra_exe_unit;
2670  const auto join_loops =
2671  buildJoinLoops(body_execution_unit, co, eo, query_infos, column_cache);
2672 
2673  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
2674  const auto is_not_deleted_bb = codegenSkipDeletedOuterTableRow(ra_exe_unit, co);
2675  if (is_not_deleted_bb) {
2676  cgen_state_->row_func_bb_ = is_not_deleted_bb;
2677  }
2678  if (!join_loops.empty()) {
2679  codegenJoinLoops(join_loops,
2680  body_execution_unit,
2681  group_by_and_aggregate,
2682  query_func,
2683  cgen_state_->row_func_bb_,
2684  *(query_mem_desc.get()),
2685  co,
2686  eo);
2687  } else {
2688  const bool can_return_error = compileBody(
2689  ra_exe_unit, group_by_and_aggregate, *query_mem_desc, co, gpu_smem_context);
2690  if (can_return_error || cgen_state_->needs_error_check_ || eo.with_dynamic_watchdog ||
2692  createErrorCheckControlFlow(query_func,
2695  co.device_type,
2696  group_by_and_aggregate.query_infos_);
2697  }
2698  }
2699  std::vector<llvm::Value*> hoisted_literals;
2700 
2701  if (co.hoist_literals) {
2702  VLOG(1) << "number of hoisted literals: "
2703  << cgen_state_->query_func_literal_loads_.size()
2704  << " / literal buffer usage: " << cgen_state_->getLiteralBufferUsage(0)
2705  << " bytes";
2706  }
2707 
2708  if (co.hoist_literals && !cgen_state_->query_func_literal_loads_.empty()) {
2709  // we have some hoisted literals...
2710  hoisted_literals = inlineHoistedLiterals();
2711  }
2712 
2713  // replace the row func placeholder call with the call to the actual row func
2714  std::vector<llvm::Value*> row_func_args;
2715  for (size_t i = 0; i < cgen_state_->row_func_call_->getNumArgOperands(); ++i) {
2716  row_func_args.push_back(cgen_state_->row_func_call_->getArgOperand(i));
2717  }
2718  row_func_args.insert(row_func_args.end(), col_heads.begin(), col_heads.end());
2719  row_func_args.push_back(get_arg_by_name(query_func, "join_hash_tables"));
2720  // push hoisted literals arguments, if any
2721  row_func_args.insert(
2722  row_func_args.end(), hoisted_literals.begin(), hoisted_literals.end());
2723  llvm::ReplaceInstWithInst(
2724  cgen_state_->row_func_call_,
2725  llvm::CallInst::Create(cgen_state_->row_func_, row_func_args, ""));
2726 
2727  // replace the filter func placeholder call with the call to the actual filter func
2728  if (cgen_state_->filter_func_) {
2729  std::vector<llvm::Value*> filter_func_args;
2730  for (auto arg_it = cgen_state_->filter_func_args_.begin();
2731  arg_it != cgen_state_->filter_func_args_.end();
2732  ++arg_it) {
2733  filter_func_args.push_back(arg_it->first);
2734  }
2735  llvm::ReplaceInstWithInst(
2736  cgen_state_->filter_func_call_,
2737  llvm::CallInst::Create(cgen_state_->filter_func_, filter_func_args, ""));
2738  }
2739 
2740  // Aggregate
2741  plan_state_->init_agg_vals_ =
2742  init_agg_val_vec(ra_exe_unit.target_exprs, ra_exe_unit.quals, *query_mem_desc);
2743 
2744  /*
2745  * If we have decided to use GPU shared memory (decision is not made here), then
2746  * we generate proper code for extra components that it needs (buffer initialization and
2747  * gpu reduction from shared memory to global memory). We then replace these functions
2748  * into the already compiled query_func (replacing two placeholders, write_back_nop and
2749  * init_smem_nop). The rest of the code should be as before (row_func, etc.).
2750  */
2751  if (gpu_smem_context.isSharedMemoryUsed()) {
2752  if (query_mem_desc->getQueryDescriptionType() ==
2754  GpuSharedMemCodeBuilder gpu_smem_code(
2755  cgen_state_->module_,
2756  cgen_state_->context_,
2757  *query_mem_desc,
2759  plan_state_->init_agg_vals_);
2760  gpu_smem_code.codegen();
2761  gpu_smem_code.injectFunctionsInto(query_func);
2762 
2763  // helper functions are used for caching purposes later
2764  cgen_state_->helper_functions_.push_back(gpu_smem_code.getReductionFunction());
2765  cgen_state_->helper_functions_.push_back(gpu_smem_code.getInitFunction());
2766  LOG(IR) << gpu_smem_code.toString();
2767  }
2768  }
2769 
2770  auto multifrag_query_func = cgen_state_->module_->getFunction(
2771  "multifrag_query" + std::string(co.hoist_literals ? "_hoisted_literals" : ""));
2772  CHECK(multifrag_query_func);
2773 
2776  multifrag_query_func, co.hoist_literals, eo.allow_runtime_query_interrupt);
2777  }
2778 
2779  bind_query(query_func,
2780  "query_stub" + std::string(co.hoist_literals ? "_hoisted_literals" : ""),
2781  multifrag_query_func,
2782  cgen_state_->module_);
2783 
2784  std::vector<llvm::Function*> root_funcs{query_func, cgen_state_->row_func_};
2785  if (cgen_state_->filter_func_) {
2786  root_funcs.push_back(cgen_state_->filter_func_);
2787  }
2788  auto live_funcs = CodeGenerator::markDeadRuntimeFuncs(
2789  *cgen_state_->module_, root_funcs, {multifrag_query_func});
2790 
2791  // Always inline the row function and the filter function.
2792  // We don't want register spills in the inner loops.
2793  // LLVM seems to correctly free up alloca instructions
2794  // in these functions even when they are inlined.
2796  if (cgen_state_->filter_func_) {
2798  }
2799 
2800 #ifndef NDEBUG
2801  // Add helpful metadata to the LLVM IR for debugging.
2803 #endif
2804 
2805  // Serialize the important LLVM IR functions to text for SQL EXPLAIN.
2806  std::string llvm_ir;
2807  if (eo.just_explain) {
2809 #ifdef WITH_JIT_DEBUG
2810  throw std::runtime_error(
2811  "Explain optimized not available when JIT runtime debug symbols are enabled");
2812 #else
2813  // Note that we don't run the NVVM reflect pass here. Use LOG(IR) to get the
2814  // optimized IR after NVVM reflect
2815  llvm::legacy::PassManager pass_manager;
2816  optimize_ir(query_func, cgen_state_->module_, pass_manager, live_funcs, co);
2817 #endif // WITH_JIT_DEBUG
2818  }
2819  llvm_ir =
2820  serialize_llvm_object(multifrag_query_func) + serialize_llvm_object(query_func) +
2821  serialize_llvm_object(cgen_state_->row_func_) +
2822  (cgen_state_->filter_func_ ? serialize_llvm_object(cgen_state_->filter_func_)
2823  : "");
2824 
2825 #ifndef NDEBUG
2826  llvm_ir += serialize_llvm_metadata_footnotes(query_func, cgen_state_.get());
2827 #endif
2828  }
2829 
2830  LOG(IR) << "\n\n" << query_mem_desc->toString() << "\n";
2831  LOG(IR) << "IR for the "
2832  << (co.device_type == ExecutorDeviceType::CPU ? "CPU:\n" : "GPU:\n");
2833 #ifdef NDEBUG
2834  LOG(IR) << serialize_llvm_object(query_func)
2835  << serialize_llvm_object(cgen_state_->row_func_)
2836  << (cgen_state_->filter_func_ ? serialize_llvm_object(cgen_state_->filter_func_)
2837  : "")
2838  << "\nEnd of IR";
2839 #else
2840  LOG(IR) << serialize_llvm_object(cgen_state_->module_) << "\nEnd of IR";
2841 #endif
2842 
2843  // Run some basic validation checks on the LLVM IR before code is generated below.
2844  verify_function_ir(cgen_state_->row_func_);
2845  if (cgen_state_->filter_func_) {
2846  verify_function_ir(cgen_state_->filter_func_);
2847  }
2848 
2849  // Generate final native code from the LLVM IR.
2850  return std::make_tuple(
2853  ? optimizeAndCodegenCPU(query_func, multifrag_query_func, live_funcs, co)
2854  : optimizeAndCodegenGPU(query_func,
2855  multifrag_query_func,
2856  live_funcs,
2857  is_group_by || ra_exe_unit.estimator,
2858  cuda_mgr,
2859  co),
2860  cgen_state_->getLiterals(),
2861  output_columnar,
2862  llvm_ir,
2863  std::move(gpu_smem_context)},
2864  std::move(query_mem_desc));
2865 }
std::tuple< llvm::Function *, llvm::CallInst * > query_group_by_template(llvm::Module *module, const bool hoist_literals, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, const bool check_scan_limit, const GpuSharedMemoryContext &gpu_smem_context)
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:207
std::vector< Analyzer::Expr * > target_exprs
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::unique_ptr< llvm::Module > rt_udf_cpu_module
void codegenJoinLoops(const std::vector< JoinLoop > &join_loops, const RelAlgExecutionUnit &ra_exe_unit, GroupByAndAggregate &group_by_and_aggregate, llvm::Function *query_func, llvm::BasicBlock *entry_bb, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const ExecutionOptions &eo)
Definition: IRCodegen.cpp:824
std::unique_ptr< llvm::Module > udf_gpu_module
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
#define LOG(tag)
Definition: Logger.h:194
std::unique_ptr< llvm::Module > rt_udf_gpu_module
void mark_function_always_inline(llvm::Function *func)
llvm::StringRef get_gpu_data_layout()
bool is_udf_module_present(bool cpu_only=false)
std::vector< InputDescriptor > input_descs
std::string serialize_llvm_metadata_footnotes(llvm::Function *query_func, CgenState *cgen_state)
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
unsigned numBlocksPerMP() const
Definition: Execute.cpp:3333
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
void optimize_ir(llvm::Function *query_func, llvm::Module *module, llvm::legacy::PassManager &pass_manager, const std::unordered_set< llvm::Function * > &live_funcs, const CompilationOptions &co)
std::vector< std::string > get_agg_fnames(const std::vector< Analyzer::Expr * > &target_exprs, const bool is_group_by)
std::string to_string(char const *&&v)
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:3266
bool is_gpu_shared_mem_supported(const QueryMemoryDescriptor *query_mem_desc_ptr, const RelAlgExecutionUnit &ra_exe_unit, const CudaMgr_Namespace::CudaMgr *cuda_mgr, const ExecutorDeviceType device_type, const unsigned gpu_blocksize, const unsigned num_blocks_per_mp)
llvm::StringRef get_gpu_target_triple_string()
bool compileBody(const RelAlgExecutionUnit &ra_exe_unit, GroupByAndAggregate &group_by_and_aggregate, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const GpuSharedMemoryContext &gpu_smem_context={})
void verify_function_ir(const llvm::Function *func)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:167
std::shared_ptr< CompilationContext > optimizeAndCodegenGPU(llvm::Function *, llvm::Function *, std::unordered_set< llvm::Function * > &, const bool no_inline, const CudaMgr_Namespace::CudaMgr *cuda_mgr, const CompilationOptions &)
static std::unordered_set< llvm::Function * > markDeadRuntimeFuncs(llvm::Module &module, const std::vector< llvm::Function * > &roots, const std::vector< llvm::Function * > &leaves)
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1053
std::unique_ptr< llvm::Module > g_rt_module
ExecutorExplainType explain_type
std::shared_ptr< CompilationContext > optimizeAndCodegenCPU(llvm::Function *, llvm::Function *, const std::unordered_set< llvm::Function * > &, const CompilationOptions &)
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1024
void insertErrorCodeChecker(llvm::Function *query_func, bool hoist_literals, bool allow_runtime_query_interrupt)
static void link_udf_module(const std::unique_ptr< llvm::Module > &udf_module, llvm::Module &module, CgenState *cgen_state, llvm::Linker::Flags flags=llvm::Linker::Flags::None)
const std::shared_ptr< Analyzer::Estimator > estimator
#define AUTOMATIC_IR_METADATA(CGENSTATE)
#define AUTOMATIC_IR_METADATA_DONE()
ExecutorDeviceType device_type
llvm::BasicBlock * codegenSkipDeletedOuterTableRow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
std::string serialize_llvm_object(const T *llvm_obj)
static bool alwaysCloneRuntimeFunction(const llvm::Function *func)
std::vector< llvm::Value * > generate_column_heads_load(const int num_columns, llvm::Value *byte_stream_arg, llvm::IRBuilder<> &ir_builder, llvm::LLVMContext &ctx)
std::unique_ptr< llvm::Module > udf_cpu_module
bool g_enable_filter_function
Definition: Execute.cpp:79
void bind_pos_placeholders(const std::string &pos_fn_name, const bool use_resume_param, llvm::Function *query_func, llvm::Module *module)
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const PlanState::DeletedColumnsMap &deleted_cols_map, const RelAlgExecutionUnit *ra_exe_unit)
Definition: Execute.cpp:3247
std::tuple< llvm::Function *, llvm::CallInst * > query_template(llvm::Module *module, const size_t aggr_col_count, const bool hoist_literals, const bool is_estimate_query, const GpuSharedMemoryContext &gpu_smem_context)
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
#define CHECK(condition)
Definition: Logger.h:203
#define DEBUG_TIMER(name)
Definition: Logger.h:319
std::vector< llvm::Value * > inlineHoistedLiterals()
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr * > &targets, const QueryMemoryDescriptor &query_mem_desc)
llvm::Function * create_row_function(const size_t in_col_count, const size_t agg_col_count, const bool hoist_literals, llvm::Module *module, llvm::LLVMContext &context)
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
std::vector< JoinLoop > buildJoinLoops(RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, const std::vector< InputTableInfo > &query_infos, ColumnCacheMap &column_cache)
Definition: IRCodegen.cpp:279
unsigned blockSize() const
Definition: Execute.cpp:3341
std::vector< int64_t > init_agg_val_vec(const std::vector< TargetInfo > &targets, const QueryMemoryDescriptor &query_mem_desc)
bool is_rt_udf_module_present(bool cpu_only=false)
#define VLOG(n)
Definition: Logger.h:297
size_t get_shared_memory_size(const bool shared_mem_used, const QueryMemoryDescriptor *query_mem_desc_ptr)
void bind_query(llvm::Function *query_func, const std::string &query_fname, llvm::Function *multifrag_query_func, llvm::Module *module)
void createErrorCheckControlFlow(llvm::Function *query_func, bool run_with_dynamic_watchdog, bool run_with_allowing_runtime_interrupt, ExecutorDeviceType device_type, const std::vector< InputTableInfo > &input_table_infos)
size_t g_gpu_smem_threshold
Definition: Execute.cpp:119
AggregatedColRange Executor::computeColRangesCache ( const std::unordered_set< PhysicalInput > &  phys_inputs)
private

Definition at line 3745 of file Execute.cpp.

References catalog_(), CHECK, getLeafColumnRange(), AggregatedColRange::setColRange(), and ExpressionRange::typeSupportsRange().

3746  {
3747  AggregatedColRange agg_col_range_cache;
3748  CHECK(catalog_);
3749  std::unordered_set<int> phys_table_ids;
3750  for (const auto& phys_input : phys_inputs) {
3751  phys_table_ids.insert(phys_input.table_id);
3752  }
3753  std::vector<InputTableInfo> query_infos;
3754  for (const int table_id : phys_table_ids) {
3755  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
3756  }
3757  for (const auto& phys_input : phys_inputs) {
3758  const auto cd =
3759  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3760  CHECK(cd);
3761  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
3762  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
3763  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
3764  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
3765  agg_col_range_cache.setColRange(phys_input, col_range);
3766  }
3767  }
3768  return agg_col_range_cache;
3769 }
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:294
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1053
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
#define CHECK(condition)
Definition: Logger.h:203
void setColRange(const PhysicalInput &, const ExpressionRange &)
static bool typeSupportsRange(const SQLTypeInfo &ti)

+ Here is the call graph for this function:

StringDictionaryGenerations Executor::computeStringDictionaryGenerations ( const std::unordered_set< PhysicalInput > &  phys_inputs)
private

Definition at line 3771 of file Execute.cpp.

References catalog_(), CHECK, kENCODING_DICT, and StringDictionaryGenerations::setGeneration().

3772  {
3773  StringDictionaryGenerations string_dictionary_generations;
3774  CHECK(catalog_);
3775  for (const auto& phys_input : phys_inputs) {
3776  const auto cd =
3777  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3778  CHECK(cd);
3779  const auto& col_ti =
3780  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
3781  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
3782  const int dict_id = col_ti.get_comp_param();
3783  const auto dd = catalog_->getMetadataForDict(dict_id);
3784  CHECK(dd && dd->stringDict);
3785  string_dictionary_generations.setGeneration(dict_id,
3786  dd->stringDict->storageEntryCount());
3787  }
3788  }
3789  return string_dictionary_generations;
3790 }
void setGeneration(const uint32_t id, const uint64_t generation)
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1053
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1494
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

TableGenerations Executor::computeTableGenerations ( std::unordered_set< int >  phys_table_ids)
private

Definition at line 3792 of file Execute.cpp.

References TableGenerations::setGeneration().

3793  {
3794  TableGenerations table_generations;
3795  for (const int table_id : phys_table_ids) {
3796  const auto table_info = getTableInfo(table_id);
3797  table_generations.setGeneration(
3798  table_id,
3799  TableGeneration{static_cast<int64_t>(table_info.getPhysicalNumTuples()), 0});
3800  }
3801  return table_generations;
3802 }
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:294
void setGeneration(const uint32_t id, const TableGeneration &generation)

+ Here is the call graph for this function:

bool Executor::containsLeftDeepOuterJoin ( ) const
inline

Definition at line 415 of file Execute.h.

References cgen_state_.

415  {
416  return cgen_state_->contains_left_deep_outer_join_;
417  }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
void Executor::createErrorCheckControlFlow ( llvm::Function *  query_func,
bool  run_with_dynamic_watchdog,
bool  run_with_allowing_runtime_interrupt,
ExecutorDeviceType  device_type,
const std::vector< InputTableInfo > &  input_table_infos 
)
private

Definition at line 1833 of file NativeCodegen.cpp.

1838  {
1840 
1841  // check whether the row processing was successful; currently, it can
1842  // fail by running out of group by buffer slots
1843 
1844  if (run_with_dynamic_watchdog && run_with_allowing_runtime_interrupt) {
1845  // when both dynamic watchdog and runtime interrupt turns on
1846  // we use dynamic watchdog
1847  run_with_allowing_runtime_interrupt = false;
1848  }
1849 
1850  {
1851  // disable injecting query interrupt checker if the session info is invalid
1852  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
1853  if (current_query_session_.empty()) {
1854  run_with_allowing_runtime_interrupt = false;
1855  }
1856  }
1857 
1858  llvm::Value* row_count = nullptr;
1859  if ((run_with_dynamic_watchdog || run_with_allowing_runtime_interrupt) &&
1860  device_type == ExecutorDeviceType::GPU) {
1861  row_count =
1862  find_variable_in_basic_block<llvm::LoadInst>(query_func, ".entry", "row_count");
1863  }
1864 
1865  bool done_splitting = false;
1866  for (auto bb_it = query_func->begin(); bb_it != query_func->end() && !done_splitting;
1867  ++bb_it) {
1868  llvm::Value* pos = nullptr;
1869  for (auto inst_it = bb_it->begin(); inst_it != bb_it->end(); ++inst_it) {
1870  if ((run_with_dynamic_watchdog || run_with_allowing_runtime_interrupt) &&
1871  llvm::isa<llvm::PHINode>(*inst_it)) {
1872  if (inst_it->getName() == "pos") {
1873  pos = &*inst_it;
1874  }
1875  continue;
1876  }
1877  if (!llvm::isa<llvm::CallInst>(*inst_it)) {
1878  continue;
1879  }
1880  auto& row_func_call = llvm::cast<llvm::CallInst>(*inst_it);
1881  if (std::string(row_func_call.getCalledFunction()->getName()) == "row_process") {
1882  auto next_inst_it = inst_it;
1883  ++next_inst_it;
1884  auto new_bb = bb_it->splitBasicBlock(next_inst_it);
1885  auto& br_instr = bb_it->back();
1886  llvm::IRBuilder<> ir_builder(&br_instr);
1887  llvm::Value* err_lv = &*inst_it;
1888  llvm::Value* err_lv_returned_from_row_func = nullptr;
1889  if (run_with_dynamic_watchdog) {
1890  CHECK(pos);
1891  llvm::Value* call_watchdog_lv = nullptr;
1892  if (device_type == ExecutorDeviceType::GPU) {
1893  // In order to make sure all threads within a block see the same barrier,
1894  // only those blocks whose none of their threads have experienced the critical
1895  // edge will go through the dynamic watchdog computation
1896  CHECK(row_count);
1897  auto crit_edge_rem =
1898  (blockSize() & (blockSize() - 1))
1899  ? ir_builder.CreateSRem(
1900  row_count,
1901  cgen_state_->llInt(static_cast<int64_t>(blockSize())))
1902  : ir_builder.CreateAnd(
1903  row_count,
1904  cgen_state_->llInt(static_cast<int64_t>(blockSize() - 1)));
1905  auto crit_edge_threshold = ir_builder.CreateSub(row_count, crit_edge_rem);
1906  crit_edge_threshold->setName("crit_edge_threshold");
1907 
1908  // only those threads where pos < crit_edge_threshold go through dynamic
1909  // watchdog call
1910  call_watchdog_lv =
1911  ir_builder.CreateICmp(llvm::ICmpInst::ICMP_SLT, pos, crit_edge_threshold);
1912  } else {
1913  // CPU path: run watchdog for every 64th row
1914  auto dw_predicate = ir_builder.CreateAnd(pos, uint64_t(0x3f));
1915  call_watchdog_lv = ir_builder.CreateICmp(
1916  llvm::ICmpInst::ICMP_EQ, dw_predicate, cgen_state_->llInt(int64_t(0LL)));
1917  }
1918  CHECK(call_watchdog_lv);
1919  auto error_check_bb = bb_it->splitBasicBlock(
1920  llvm::BasicBlock::iterator(br_instr), ".error_check");
1921  auto& watchdog_br_instr = bb_it->back();
1922 
1923  auto watchdog_check_bb = llvm::BasicBlock::Create(
1924  cgen_state_->context_, ".watchdog_check", query_func, error_check_bb);
1925  llvm::IRBuilder<> watchdog_ir_builder(watchdog_check_bb);
1926  auto detected_timeout = watchdog_ir_builder.CreateCall(
1927  cgen_state_->module_->getFunction("dynamic_watchdog"), {});
1928  auto timeout_err_lv = watchdog_ir_builder.CreateSelect(
1929  detected_timeout, cgen_state_->llInt(Executor::ERR_OUT_OF_TIME), err_lv);
1930  watchdog_ir_builder.CreateBr(error_check_bb);
1931 
1932  llvm::ReplaceInstWithInst(
1933  &watchdog_br_instr,
1934  llvm::BranchInst::Create(
1935  watchdog_check_bb, error_check_bb, call_watchdog_lv));
1936  ir_builder.SetInsertPoint(&br_instr);
1937  auto unified_err_lv = ir_builder.CreatePHI(err_lv->getType(), 2);
1938 
1939  unified_err_lv->addIncoming(timeout_err_lv, watchdog_check_bb);
1940  unified_err_lv->addIncoming(err_lv, &*bb_it);
1941  err_lv = unified_err_lv;
1942  } else if (run_with_allowing_runtime_interrupt) {
1943  CHECK(pos);
1944  llvm::Value* call_check_interrupt_lv = nullptr;
1945  if (device_type == ExecutorDeviceType::GPU) {
1946  // approximate how many times the %pos variable
1947  // is increased --> the number of iteration
1948  // here we calculate the # bit shift by considering grid/block/fragment sizes
1949  // since if we use the fixed one (i.e., per 64-th increment)
1950  // some CUDA threads cannot enter the interrupt checking block depending on
1951  // the fragment size --> a thread may not take care of 64 threads if an outer
1952  // table is not sufficiently large, and so cannot be interrupted
1953  int32_t num_shift_by_gridDim = shared::getExpOfTwo(gridSize());
1954  int32_t num_shift_by_blockDim = shared::getExpOfTwo(blockSize());
1955  int total_num_shift = num_shift_by_gridDim + num_shift_by_blockDim;
1956  uint64_t interrupt_checking_freq = 32;
1957  auto freq_control_knob = g_running_query_interrupt_freq;
1958  CHECK_GT(freq_control_knob, 0);
1959  CHECK_LE(freq_control_knob, 1.0);
1960  if (!input_table_infos.empty()) {
1961  const auto& outer_table_info = *input_table_infos.begin();
1962  auto num_outer_table_tuples = outer_table_info.info.getNumTuples();
1963  if (outer_table_info.table_id < 0) {
1964  auto* rs = (*outer_table_info.info.fragments.begin()).resultSet;
1965  CHECK(rs);
1966  num_outer_table_tuples = rs->entryCount();
1967  } else {
1968  auto num_frags = outer_table_info.info.fragments.size();
1969  if (num_frags > 0) {
1970  num_outer_table_tuples =
1971  outer_table_info.info.fragments.begin()->getNumTuples();
1972  }
1973  }
1974  if (num_outer_table_tuples > 0) {
1975  // gridSize * blockSize --> pos_step (idx of the next row per thread)
1976  // we additionally multiply two to pos_step since the number of
1977  // dispatched blocks are double of the gridSize
1978  // # tuples (of fragment) / pos_step --> maximum # increment (K)
1979  // also we multiply 1 / freq_control_knob to K to control the frequency
1980  // So, needs to check the interrupt status more frequently? make K smaller
1981  auto max_inc = uint64_t(
1982  floor(num_outer_table_tuples / (gridSize() * blockSize() * 2)));
1983  if (max_inc < 2) {
1984  // too small `max_inc`, so this correction is necessary to make
1985  // `interrupt_checking_freq` be valid (i.e., larger than zero)
1986  max_inc = 2;
1987  }
1988  auto calibrated_inc = uint64_t(floor(max_inc * (1 - freq_control_knob)));
1989  interrupt_checking_freq =
1990  uint64_t(pow(2, shared::getExpOfTwo(calibrated_inc)));
1991  // add the coverage when interrupt_checking_freq > K
1992  // if so, some threads still cannot be branched to the interrupt checker
1993  // so we manually use smaller but close to the max_inc as freq
1994  if (interrupt_checking_freq > max_inc) {
1995  interrupt_checking_freq = max_inc / 2;
1996  }
1997  if (interrupt_checking_freq < 8) {
1998  // such small freq incurs too frequent interrupt status checking,
1999  // so we fixup to the minimum freq value at some reasonable degree
2000  interrupt_checking_freq = 8;
2001  }
2002  }
2003  }
2004  VLOG(1) << "Set the running query interrupt checking frequency: "
2005  << interrupt_checking_freq;
2006  // check the interrupt flag for every interrupt_checking_freq-th iteration
2007  llvm::Value* pos_shifted_per_iteration =
2008  ir_builder.CreateLShr(pos, cgen_state_->llInt(total_num_shift));
2009  auto interrupt_predicate =
2010  ir_builder.CreateAnd(pos_shifted_per_iteration, interrupt_checking_freq);
2011  call_check_interrupt_lv =
2012  ir_builder.CreateICmp(llvm::ICmpInst::ICMP_EQ,
2013  interrupt_predicate,
2014  cgen_state_->llInt(int64_t(0LL)));
2015  } else {
2016  // CPU path: run interrupt checker for every 64th row
2017  auto interrupt_predicate = ir_builder.CreateAnd(pos, uint64_t(0x3f));
2018  call_check_interrupt_lv =
2019  ir_builder.CreateICmp(llvm::ICmpInst::ICMP_EQ,
2020  interrupt_predicate,
2021  cgen_state_->llInt(int64_t(0LL)));
2022  }
2023  CHECK(call_check_interrupt_lv);
2024  auto error_check_bb = bb_it->splitBasicBlock(
2025  llvm::BasicBlock::iterator(br_instr), ".error_check");
2026  auto& check_interrupt_br_instr = bb_it->back();
2027 
2028  auto interrupt_check_bb = llvm::BasicBlock::Create(
2029  cgen_state_->context_, ".interrupt_check", query_func, error_check_bb);
2030  llvm::IRBuilder<> interrupt_checker_ir_builder(interrupt_check_bb);
2031  auto detected_interrupt = interrupt_checker_ir_builder.CreateCall(
2032  cgen_state_->module_->getFunction("check_interrupt"), {});
2033  auto interrupt_err_lv = interrupt_checker_ir_builder.CreateSelect(
2034  detected_interrupt, cgen_state_->llInt(Executor::ERR_INTERRUPTED), err_lv);
2035  interrupt_checker_ir_builder.CreateBr(error_check_bb);
2036 
2037  llvm::ReplaceInstWithInst(
2038  &check_interrupt_br_instr,
2039  llvm::BranchInst::Create(
2040  interrupt_check_bb, error_check_bb, call_check_interrupt_lv));
2041  ir_builder.SetInsertPoint(&br_instr);
2042  auto unified_err_lv = ir_builder.CreatePHI(err_lv->getType(), 2);
2043 
2044  unified_err_lv->addIncoming(interrupt_err_lv, interrupt_check_bb);
2045  unified_err_lv->addIncoming(err_lv, &*bb_it);
2046  err_lv = unified_err_lv;
2047  }
2048  if (!err_lv_returned_from_row_func) {
2049  err_lv_returned_from_row_func = err_lv;
2050  }
2051  if (device_type == ExecutorDeviceType::GPU && g_enable_dynamic_watchdog) {
2052  // let kernel execution finish as expected, regardless of the observed error,
2053  // unless it is from the dynamic watchdog where all threads within that block
2054  // return together.
2055  err_lv = ir_builder.CreateICmp(llvm::ICmpInst::ICMP_EQ,
2056  err_lv,
2058  } else {
2059  err_lv = ir_builder.CreateICmp(llvm::ICmpInst::ICMP_NE,
2060  err_lv,
2061  cgen_state_->llInt(static_cast<int32_t>(0)));
2062  }
2063  auto error_bb = llvm::BasicBlock::Create(
2064  cgen_state_->context_, ".error_exit", query_func, new_bb);
2065  const auto error_code_arg = get_arg_by_name(query_func, "error_code");
2066  llvm::CallInst::Create(
2067  cgen_state_->module_->getFunction("record_error_code"),
2068  std::vector<llvm::Value*>{err_lv_returned_from_row_func, error_code_arg},
2069  "",
2070  error_bb);
2071  llvm::ReturnInst::Create(cgen_state_->context_, error_bb);
2072  llvm::ReplaceInstWithInst(&br_instr,
2073  llvm::BranchInst::Create(error_bb, new_bb, err_lv));
2074  done_splitting = true;
2075  break;
2076  }
2077  }
2078  }
2079  CHECK(done_splitting);
2080 }
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1068
double g_running_query_interrupt_freq
Definition: Execute.cpp:118
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1116
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:1009
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
#define CHECK_GT(x, y)
Definition: Logger.h:215
unsigned getExpOfTwo(unsigned n)
Definition: MathUtils.cpp:23
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:167
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1115
#define AUTOMATIC_IR_METADATA(CGENSTATE)
static QuerySessionId current_query_session_
Definition: Execute.h:1070
#define CHECK_LE(x, y)
Definition: Logger.h:214
unsigned gridSize() const
Definition: Execute.cpp:3324
#define CHECK(condition)
Definition: Logger.h:203
unsigned blockSize() const
Definition: Execute.cpp:3341
#define VLOG(n)
Definition: Logger.h:297
std::vector< std::unique_ptr< ExecutionKernel > > Executor::createKernels ( SharedKernelContext shared_context,
const RelAlgExecutionUnit ra_exe_unit,
ColumnFetcher column_fetcher,
const std::vector< InputTableInfo > &  table_infos,
const ExecutionOptions eo,
const bool  is_agg,
const bool  allow_single_frag_table_opt,
const size_t  context_count,
const QueryCompilationDescriptor query_comp_desc,
const QueryMemoryDescriptor query_mem_desc,
RenderInfo render_info,
std::unordered_set< int > &  available_gpus,
int &  available_cpus 
)
private

Determines execution dispatch mode and required fragments for a given query step, then creates kernels to execute the query and returns them for launch.

Definition at line 2046 of file Execute.cpp.

References ExecutionOptions::allow_multifrag, catalog_(), CHECK, CHECK_GE, CHECK_GT, anonymous_namespace{Execute.cpp}::checkWorkUnitWatchdog(), g_inner_join_fragment_skipping, QueryCompilationDescriptor::getDeviceType(), QueryMemoryDescriptor::getEntryCount(), SharedKernelContext::getFragOffsets(), QueryMemoryDescriptor::getQueryDescriptionType(), GPU, ExecutionOptions::gpu_input_mem_limit_percent, Data_Namespace::GPU_LEVEL, anonymous_namespace{Execute.cpp}::has_lazy_fetched_columns(), logger::INFO, RelAlgExecutionUnit::input_descs, KernelPerFragment, LOG, MultifragmentKernel, ExecutionOptions::outer_fragment_indices, Projection, query_mem_desc, RelAlgExecutionUnit::target_exprs, QueryMemoryDescriptor::toString(), RelAlgExecutionUnit::use_bump_allocator, VLOG, and ExecutionOptions::with_watchdog.

2059  {
2060  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2061 
2062  QueryFragmentDescriptor fragment_descriptor(
2063  ra_exe_unit,
2064  table_infos,
2065  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
2067  : std::vector<Data_Namespace::MemoryInfo>{},
2070  CHECK(!ra_exe_unit.input_descs.empty());
2071 
2072  const auto device_type = query_comp_desc.getDeviceType();
2073  const bool uses_lazy_fetch =
2074  plan_state_->allow_lazy_fetch_ &&
2076  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
2077  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
2078  const auto device_count = deviceCount(device_type);
2079  CHECK_GT(device_count, 0);
2080 
2081  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2082  shared_context.getFragOffsets(),
2083  device_count,
2084  device_type,
2085  use_multifrag_kernel,
2087  this);
2088  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2089  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
2090  }
2091 
2092  if (use_multifrag_kernel) {
2093  VLOG(1) << "Creating multifrag execution kernels";
2094  VLOG(1) << query_mem_desc.toString();
2095 
2096  // NB: We should never be on this path when the query is retried because of running
2097  // out of group by slots; also, for scan only queries on CPU we want the
2098  // high-granularity, fragment by fragment execution instead. For scan only queries on
2099  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
2100  // buffer per fragment.
2101  auto multifrag_kernel_dispatch = [&ra_exe_unit,
2102  &execution_kernels,
2103  &column_fetcher,
2104  &eo,
2105  &query_comp_desc,
2106  &query_mem_desc,
2107  render_info](const int device_id,
2108  const FragmentsList& frag_list,
2109  const int64_t rowid_lookup_key) {
2110  execution_kernels.emplace_back(
2111  std::make_unique<ExecutionKernel>(ra_exe_unit,
2113  device_id,
2114  eo,
2115  column_fetcher,
2116  query_comp_desc,
2117  query_mem_desc,
2118  frag_list,
2120  render_info,
2121  rowid_lookup_key));
2122  };
2123  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2124  } else {
2125  VLOG(1) << "Creating one execution kernel per fragment";
2126  VLOG(1) << query_mem_desc.toString();
2127 
2128  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2129  (query_mem_desc.getQueryDescriptionType() == QueryDescriptionType::Projection) &&
2130  table_infos.size() == 1 && table_infos.front().table_id > 0) {
2131  const auto max_frag_size =
2132  table_infos.front().info.getFragmentNumTuplesUpperBound();
2133  if (max_frag_size < query_mem_desc.getEntryCount()) {
2134  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2135  << " to match max fragment size " << max_frag_size
2136  << " for kernel per fragment execution path.";
2137  throw CompilationRetryNewScanLimit(max_frag_size);
2138  }
2139  }
2140 
2141  size_t frag_list_idx{0};
2142  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2143  &execution_kernels,
2144  &column_fetcher,
2145  &eo,
2146  &frag_list_idx,
2147  &device_type,
2148  &query_comp_desc,
2149  &query_mem_desc,
2150  render_info](const int device_id,
2151  const FragmentsList& frag_list,
2152  const int64_t rowid_lookup_key) {
2153  if (!frag_list.size()) {
2154  return;
2155  }
2156  CHECK_GE(device_id, 0);
2157 
2158  execution_kernels.emplace_back(
2159  std::make_unique<ExecutionKernel>(ra_exe_unit,
2160  device_type,
2161  device_id,
2162  eo,
2163  column_fetcher,
2164  query_comp_desc,
2165  query_mem_desc,
2166  frag_list,
2168  render_info,
2169  rowid_lookup_key));
2170  ++frag_list_idx;
2171  };
2172 
2173  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2174  ra_exe_unit);
2175  }
2176 
2177  return execution_kernels;
2178 }
bool is_agg(const Analyzer::Expr *expr)
std::vector< Analyzer::Expr * > target_exprs
ExecutorDeviceType getDeviceType() const
const std::vector< uint64_t > & getFragOffsets()
std::string toString() const
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
#define LOG(tag)
Definition: Logger.h:194
std::vector< size_t > outer_fragment_indices
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:336
std::vector< InputDescriptor > input_descs
#define CHECK_GE(x, y)
Definition: Logger.h:216
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:645
#define CHECK_GT(x, y)
Definition: Logger.h:215
std::vector< FragmentsPerTable > FragmentsList
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:85
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1053
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:1024
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const Catalog_Namespace::Catalog &cat, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1108
std::vector< MemoryInfo > getMemoryInfo(const MemoryLevel memLevel)
Definition: DataMgr.cpp:303
#define CHECK(condition)
Definition: Logger.h:203
double gpu_input_mem_limit_percent
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
Definition: Execute.cpp:2035
#define VLOG(n)
Definition: Logger.h:297

+ Here is the call graph for this function:

int Executor::deviceCount ( const ExecutorDeviceType  device_type) const
private

Definition at line 645 of file Execute.cpp.

References catalog_(), CHECK, and GPU.

645  {
646  if (device_type == ExecutorDeviceType::GPU) {
647  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
648  CHECK(cuda_mgr);
649  return cuda_mgr->getDeviceCount();
650  } else {
651  return 1;
652  }
653 }
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:207
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1053
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

int Executor::deviceCountForMemoryLevel ( const Data_Namespace::MemoryLevel  memory_level) const
private

Definition at line 655 of file Execute.cpp.

References CPU, GPU, and Data_Namespace::GPU_LEVEL.

656  {
657  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
658  : deviceCount(ExecutorDeviceType::CPU);
659 }
ExecutorDeviceType
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:645
int64_t Executor::deviceCycles ( int  milliseconds) const
private

Definition at line 3355 of file Execute.cpp.

References catalog_(), and CHECK.

3355  {
3356  CHECK(catalog_);
3357  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3358  CHECK(cuda_mgr);
3359  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3360  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
3361 }
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:207
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:1053
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

void Executor::enableRuntimeQueryInterrupt ( const double  runtime_query_check_freq,
const unsigned  pending_query_check_freq 
) const

Definition at line 4152 of file Execute.cpp.

References g_enable_runtime_query_interrupt, g_pending_query_interrupt_freq, and g_running_query_interrupt_freq.

4154  {
4155  // The only one scenario that we intentionally call this function is
4156  // to allow runtime query interrupt in QueryRunner for test cases.
4157  // Because test machine's default setting does not allow runtime query interrupt,
4158  // so we have to turn it on within test code if necessary.
4160  g_pending_query_interrupt_freq = pending_query_check_freq;
4161  g_running_query_interrupt_freq = runtime_query_check_freq;
4164  }
4165 }
double g_running_query_interrupt_freq
Definition: Execute.cpp:118
unsigned g_pending_query_interrupt_freq
Definition: Execute.cpp:117
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:114
void Executor::enrollQuerySession ( const QuerySessionId query_session,
const std::string &  query_str,
const std::string &  submitted_time_str,
const size_t  executor_id,
const QuerySessionStatus::QueryStatus  query_session_status 
)

Definition at line 3959 of file Execute.cpp.

References executor_id_().

3964  {
3965  // enroll the query session into the Executor's session map
3966  mapd_unique_lock<mapd_shared_mutex> session_write_lock(executor_session_mutex_);
3967  if (query_session.empty()) {
3968  return;
3969  }
3970 
3971  addToQuerySessionList(query_session,
3972  query_str,
3973  submitted_time_str,
3974  executor_id,
3975  query_session_status,
3976  session_write_lock);
3977 
3978  if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING) {
3979  current_query_session_ = query_session;
3981  }
3982 }
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1068
const ExecutorId executor_id_
Definition: Execute.h:1052
bool addToQuerySessionList(const QuerySessionId &query_session, const std::string &query_str, const std::string &submitted, const size_t executor_id, const QuerySessionStatus::QueryStatus query_status, mapd_unique_lock< mapd_shared_mutex > &write_lock)
Definition: Execute.cpp:3984
static QuerySessionId current_query_session_
Definition: Execute.h:1070
static size_t running_query_executor_id_
Definition: Execute.h:1072

+ Here is the call graph for this function:

ResultSetPtr Executor::executeExplain ( const QueryCompilationDescriptor query_comp_desc)
private

Definition at line 1718 of file Execute.cpp.

References QueryCompilationDescriptor::getIR().

1718  {
1719  return std::make_shared<ResultSet>(query_comp_desc.getIR());
1720 }

+ Here is the call graph for this function:

int32_t Executor::executePlanWithGroupBy ( const RelAlgExecutionUnit ra_exe_unit,
const CompilationResult compilation_result,
const bool  hoist_literals,
ResultSetPtr results,
const ExecutorDeviceType  device_type,
std::vector< std::vector< const int8_t * >> &  col_buffers,
const std::vector< size_t >  outer_tab_frag_ids,
QueryExecutionContext query_exe_context,
const std::vector< std::vector< int64_t >> &  num_rows,
const std::vector< std::vector< uint64_t >> &  frag_offsets,
Data_Namespace::DataMgr data_mgr,
const int  device_id,
const int  outer_table_id,
const int64_t  limit,
const uint32_t  start_rowid,
const uint32_t  num_tables,
const bool  allow_runtime_interrupt,
RenderInfo render_info 
)
private

Definition at line 3054 of file Execute.cpp.

References CHECK, CHECK_NE, anonymous_namespace{Execute.cpp}::check_rows_less_than_needed(), CPU, DEBUG_TIMER, ERR_DIV_BY_ZERO, ERR_GEOS, ERR_INTERRUPTED, ERR_OUT_OF_TIME, ERR_OVERFLOW_OR_UNDERFLOW, ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES, logger::FATAL, g_enable_dynamic_watchdog, CompilationResult::generated_code, QueryMemoryDescriptor::getEntryCount(), QueryExecutionContext::getRowSet(), GpuSharedMemoryContext::getSharedMemorySize(), GPU, CompilationResult::gpu_smem_context, RelAlgExecutionUnit::groupby_exprs, INJECT_TIMER, RelAlgExecutionUnit::input_col_descs, RelAlgExecutionUnit::input_descs, QueryExecutionContext::launchCpuCode(), QueryExecutionContext::launchGpuCode(), CompilationResult::literal_values, LOG, shared::printContainer(), QueryExecutionContext::query_buffers_, QueryExecutionContext::query_mem_desc_, RenderInfo::render_allocator_map_ptr, RelAlgExecutionUnit::scan_limit, QueryMemoryDescriptor::setEntryCount(), RelAlgExecutionUnit::union_all, RenderInfo::useCudaBuffers(), and VLOG.

3072  {
3073  auto timer = DEBUG_TIMER(__func__);
3075  CHECK(!results);
3076  if (col_buffers.empty()) {
3077  return 0;
3078  }
3079  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
3080  // TODO(alex):
3081  // 1. Optimize size (make keys more compact).
3082  // 2. Resize on overflow.
3083  // 3. Optimize runtime.
3084  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3085  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
3086  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3087  if (allow_runtime_interrupt) {
3088  bool isInterrupted = false;
3089  {
3090  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
3091  const auto query_session = getCurrentQuerySession(session_read_lock);
3092  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3093  }
3094  if (isInterrupted) {
3096  }
3097  }
3098  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3099  return ERR_INTERRUPTED;
3100  }
3101 
3102  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3103  if (render_info && render_info->useCudaBuffers()) {
3104  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3105  }
3106 
3107  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
3108  << " ra_exe_unit.input_descs="
3109  << shared::printContainer(ra_exe_unit.input_descs)
3110  << " ra_exe_unit.input_col_descs="
3111  << shared::printContainer(ra_exe_unit.input_col_descs)
3112  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
3113  << " num_rows=" << shared::printContainer(num_rows)
3114  << " frag_offsets=" << shared::printContainer(frag_offsets)
3115  << " query_exe_context->query_buffers_->num_rows_="
3116  << query_exe_context->query_buffers_->num_rows_
3117  << " query_exe_context->query_mem_desc_.getEntryCount()="
3118  << query_exe_context->query_mem_desc_.getEntryCount()
3119  << " device_id=" << device_id << " outer_table_id=" << outer_table_id
3120  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
3121  << " num_tables=" << num_tables;
3122 
3123  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
3124  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
3125  // with outer_table_id.
3126  if (ra_exe_unit_copy.union_all) {
3127  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
3128  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
3129  ra_exe_unit_copy.input_descs.end(),
3130  [outer_table_id](auto const& a, auto const& b) {
3131  return a.getTableId() == outer_table_id &&
3132  b.getTableId() != outer_table_id;
3133  });
3134  while (!ra_exe_unit_copy.input_descs.empty() &&
3135  ra_exe_unit_copy.input_descs.back().getTableId() != outer_table_id) {
3136  ra_exe_unit_copy.input_descs.pop_back();
3137  }
3138  // Filter ra_exe_unit_copy.input_col_descs.
3139  ra_exe_unit_copy.input_col_descs.remove_if(
3140  [outer_table_id](auto const& input_col_desc) {
3141  return input_col_desc->getScanDesc().getTableId() != outer_table_id;
3142  });
3143  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
3144  }
3145 
3146  if (device_type == ExecutorDeviceType::CPU) {
3147  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
3148  compilation_result.generated_code);
3149  CHECK(cpu_generated_code);
3150  query_exe_context->launchCpuCode(
3151  ra_exe_unit_copy,
3152  cpu_generated_code.get(),
3153  hoist_literals,
3154  hoist_buf,
3155  col_buffers,
3156  num_rows,
3157  frag_offsets,
3158  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
3159  &error_code,
3160  num_tables,
3161  join_hash_table_ptrs);
3162  } else {
3163  try {
3164  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
3165  compilation_result.generated_code);
3166  CHECK(gpu_generated_code);
3167  query_exe_context->launchGpuCode(
3168  ra_exe_unit_copy,
3169  gpu_generated_code.get(),
3170  hoist_literals,
3171  hoist_buf,
3172  col_buffers,
3173  num_rows,
3174  frag_offsets,
3175  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
3176  data_mgr,
3177  blockSize(),
3178  gridSize(),
3179  device_id,
3180  compilation_result.gpu_smem_context.getSharedMemorySize(),
3181  &error_code,
3182  num_tables,
3183  allow_runtime_interrupt,
3184  join_hash_table_ptrs,
3185  render_allocator_map_ptr);
3186  } catch (const OutOfMemory&) {
3187  return ERR_OUT_OF_GPU_MEM;
3188  } catch (const OutOfRenderMemory&) {
3189  return ERR_OUT_OF_RENDER_MEM;
3190  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
3192  } catch (const std::exception& e) {
3193  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3194  }
3195  }
3196 
3197  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
3198  error_code == Executor::ERR_DIV_BY_ZERO ||
3199  error_code == Executor::ERR_OUT_OF_TIME ||
3200  error_code == Executor::ERR_INTERRUPTED ||
3202  error_code == Executor::ERR_GEOS) {
3203  return error_code;
3204  }
3205 
3206  if (error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
3207  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
3208  results = query_exe_context->getRowSet(ra_exe_unit_copy,
3209  query_exe_context->query_mem_desc_);
3210  CHECK(results);
3211  VLOG(2) << "results->rowCount()=" << results->rowCount();
3212  results->holdLiterals(hoist_buf);
3213  }
3214  if (error_code < 0 && render_allocator_map_ptr) {
3215  auto const adjusted_scan_limit =
3216  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3217  // More rows passed the filter than available slots. We don't have a count to check,
3218  // so assume we met the limit if a scan limit is set
3219  if (adjusted_scan_limit != 0) {
3220  return 0;
3221  } else {
3222  return error_code;
3223  }
3224  }
3225  if (error_code && (!scan_limit || check_rows_less_than_needed(results, scan_limit))) {
3226  return error_code; // unlucky, not enough results and we ran out of slots
3227  }
3228 
3229  return 0;
3230 }
QuerySessionId & getCurrentQuerySession(mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3819
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1068
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, RenderInfo *render_info)
Definition: Execute.cpp:3054
bool useCudaBuffers() const
Definition: RenderInfo.cpp:69
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1116
void setEntryCount(const size_t val)
GpuSharedMemoryContext gpu_smem_context
const std::optional< bool > union_all
#define LOG(tag)
Definition: Logger.h:194
size_t getSharedMemorySize() const
std::vector< InputDescriptor > input_descs
static const int32_t ERR_GEOS
Definition: Execute.h:1122
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
bool checkIsQuerySessionInterrupted(const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4127
std::unique_ptr< QueryMemoryInitializer > query_buffers_
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3232
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:1120
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1108
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
#define INJECT_TIMER(DESC)
Definition: measure.h:93
#define CHECK_NE(x, y)
Definition: Logger.h:212
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1112
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1114
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1115
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1121
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1109
std::shared_ptr< CompilationContext > generated_code
QueryMemoryDescriptor query_mem_desc_
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:383
unsigned gridSize() const
Definition: Execute.cpp:3324
std::unordered_map< int, CgenState::LiteralValues > literal_values
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
Definition: Execute.cpp:3047
static std::atomic< bool > interrupted_
Definition: Execute.h:1033
#define CHECK(condition)
Definition: Logger.h:203
#define DEBUG_TIMER(name)
Definition: Logger.h:319
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:80
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const GpuCompilationContext *cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const bool allow_runtime_interrupt, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
unsigned blockSize() const
Definition: Execute.cpp:3341
#define VLOG(n)
Definition: Logger.h:297

+ Here is the call graph for this function:

int32_t Executor::executePlanWithoutGroupBy ( const RelAlgExecutionUnit ra_exe_unit,
const CompilationResult compilation_result,
const bool  hoist_literals,
ResultSetPtr results,
const std::vector< Analyzer::Expr * > &  target_exprs,
const ExecutorDeviceType  device_type,
std::vector< std::vector< const int8_t * >> &  col_buffers,
QueryExecutionContext query_exe_context,
const std::vector< std::vector< int64_t >> &  num_rows,
const std::vector< std::vector< uint64_t >> &  frag_offsets,
Data_Namespace::DataMgr data_mgr,
const int  device_id,
const uint32_t  start_rowid,
const uint32_t  num_tables,
const bool  allow_runtime_interrupt,
RenderInfo render_info 
)
private

Definition at line 2842 of file Execute.cpp.

References CHECK, CHECK_EQ, CPU, DEBUG_TIMER, ERR_DIV_BY_ZERO, ERR_GEOS, ERR_INTERRUPTED, ERR_OUT_OF_TIME, ERR_OVERFLOW_OR_UNDERFLOW, ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES, RelAlgExecutionUnit::estimator, QueryExecutionContext::estimator_result_set_, logger::FATAL, g_bigint_count, g_enable_dynamic_watchdog, CompilationResult::generated_code, get_target_info(), QueryExecutionContext::getAggInitValForIndex(), QueryMemoryDescriptor::getPaddedSlotWidthBytes(), GpuSharedMemoryContext::getSharedMemorySize(), GPU, CompilationResult::gpu_smem_context, i, INJECT_TIMER, is_distinct_target(), RenderInfo::isPotentialInSituRender(), GpuSharedMemoryContext::isSharedMemoryUsed(), kAPPROX_COUNT_DISTINCT, kAPPROX_MEDIAN, kAVG, kCOUNT, kSAMPLE, QueryExecutionContext::launchCpuCode(), QueryExecutionContext::launchGpuCode(), CompilationResult::literal_values, LOG, QueryExecutionContext::query_buffers_, QueryExecutionContext::query_mem_desc_, reduceResults(), RenderInfo::render_allocator_map_ptr, takes_float_argument(), and RenderInfo::useCudaBuffers().

2858  {
2860  auto timer = DEBUG_TIMER(__func__);
2861  CHECK(!results);
2862  if (col_buffers.empty()) {
2863  return 0;
2864  }
2865 
2866  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2867  if (render_info) {
2868  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
2869  // here, we are in non-insitu mode.
2870  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
2871  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
2872  "currently unsupported.";
2873  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2874  }
2875 
2876  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2877  std::vector<int64_t*> out_vec;
2878  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2879  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2880  std::unique_ptr<OutVecOwner> output_memory_scope;
2881  if (allow_runtime_interrupt) {
2882  bool isInterrupted = false;
2883  {
2884  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor_session_mutex_);
2885  const auto query_session = getCurrentQuerySession(session_read_lock);
2886  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
2887  }
2888  if (isInterrupted) {
2890  }
2891  }
2892  if (g_enable_dynamic_watchdog && interrupted_.load()) {
2894  }
2895  if (device_type == ExecutorDeviceType::CPU) {
2896  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
2897  compilation_result.generated_code);
2898  CHECK(cpu_generated_code);
2899  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
2900  cpu_generated_code.get(),
2901  hoist_literals,
2902  hoist_buf,
2903  col_buffers,
2904  num_rows,
2905  frag_offsets,
2906  0,
2907  &error_code,
2908  num_tables,
2909  join_hash_table_ptrs);
2910  output_memory_scope.reset(new OutVecOwner(out_vec));
2911  } else {
2912  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
2913  compilation_result.generated_code);
2914  CHECK(gpu_generated_code);
2915  try {
2916  out_vec = query_exe_context->launchGpuCode(
2917  ra_exe_unit,
2918  gpu_generated_code.get(),
2919  hoist_literals,
2920  hoist_buf,
2921  col_buffers,
2922  num_rows,
2923  frag_offsets,
2924  0,
2925  data_mgr,
2926  blockSize(),
2927  gridSize(),
2928  device_id,
2929  compilation_result.gpu_smem_context.getSharedMemorySize(),
2930  &error_code,
2931  num_tables,
2932  allow_runtime_interrupt,
2933  join_hash_table_ptrs,
2934  render_allocator_map_ptr);
2935  output_memory_scope.reset(new OutVecOwner(out_vec));
2936  } catch (const OutOfMemory&) {
2937  return ERR_OUT_OF_GPU_MEM;
2938  } catch (const std::exception& e) {
2939  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2940  }
2941  }
2942  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2943  error_code == Executor::ERR_DIV_BY_ZERO ||
2944  error_code == Executor::ERR_OUT_OF_TIME ||
2945  error_code == Executor::ERR_INTERRUPTED ||
2947  error_code == Executor::ERR_GEOS) {
2948  return error_code;
2949  }
2950  if (ra_exe_unit.estimator) {
2951  CHECK(!error_code);
2952  results =
2953  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
2954  return 0;
2955  }
2956  std::vector<int64_t> reduced_outs;
2957  const auto num_frags = col_buffers.size();
2958  const size_t entry_count =
2959  device_type == ExecutorDeviceType::GPU
2960  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
2961  ? 1
2962  : blockSize() * gridSize() * num_frags)
2963  : num_frags;
2964  if (size_t(1) == entry_count) {
2965  for (auto out : out_vec) {
2966  CHECK(out);
2967  reduced_outs.push_back(*out);
2968  }
2969  } else {
2970  size_t out_vec_idx = 0;
2971 
2972  for (const auto target_expr : target_exprs) {
2973  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2974  CHECK(agg_info.is_agg);
2975 
2976  const int num_iterations = agg_info.sql_type.is_geometry()
2977  ? agg_info.sql_type.get_physical_coord_cols()
2978  : 1;
2979 
2980  for (int i = 0; i < num_iterations; i++) {
2981  int64_t val1;
2982  const bool float_argument_input = takes_float_argument(agg_info);
2983  if (is_distinct_target(agg_info) || agg_info.agg_kind == kAPPROX_MEDIAN) {
2984  CHECK(agg_info.agg_kind == kCOUNT ||
2985  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT ||
2986  agg_info.agg_kind == kAPPROX_MEDIAN);
2987  val1 = out_vec[out_vec_idx][0];
2988  error_code = 0;
2989  } else {
2990  const auto chosen_bytes = static_cast<size_t>(
2991  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
2992  std::tie(val1, error_code) = Executor::reduceResults(
2993  agg_info.agg_kind,
2994  agg_info.sql_type,
2995  query_exe_context->getAggInitValForIndex(out_vec_idx),
2996  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2997  out_vec[out_vec_idx],
2998  entry_count,
2999  false,
3000  float_argument_input);
3001  }
3002  if (error_code) {
3003  break;
3004  }
3005  reduced_outs.push_back(val1);
3006  if (agg_info.agg_kind == kAVG ||
3007  (agg_info.agg_kind == kSAMPLE &&
3008  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
3009  const auto chosen_bytes = static_cast<size_t>(
3010  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
3011  1));
3012  int64_t val2;
3013  std::tie(val2, error_code) = Executor::reduceResults(
3014  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
3015  agg_info.sql_type,
3016  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
3017  float_argument_input ? sizeof(int32_t) : chosen_bytes,
3018  out_vec[out_vec_idx + 1],
3019  entry_count,
3020  false,
3021  false);
3022  if (error_code) {
3023  break;
3024  }
3025  reduced_outs.push_back(val2);
3026  ++out_vec_idx;
3027  }
3028  ++out_vec_idx;
3029  }
3030  }
3031  }
3032 
3033  if (error_code) {
3034  return error_code;
3035  }
3036 
3037  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
3038  auto rows_ptr = std::shared_ptr<ResultSet>(
3039  query_exe_context->query_buffers_->result_sets_[0].release());
3040  rows_ptr->fillOneEntry(reduced_outs);
3041  results = std::move(rows_ptr);
3042  return error_code;
3043 }
QuerySessionId & getCurrentQuerySession(mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:3819
static mapd_shared_mutex executor_session_mutex_
Definition: Execute.h:1068
#define CHECK_EQ(x, y)
Definition: Logger.h:211
bool useCudaBuffers() const
Definition: RenderInfo.cpp:69
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1116
GpuSharedMemoryContext gpu_smem_context
#define LOG(tag)
Definition: Logger.h:194
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:79
size_t getSharedMemorySize() const
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:662
static const int32_t ERR_GEOS
Definition: Execute.h:1122
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:134
bool checkIsQuerySessionInterrupted(const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
Definition: Execute.cpp:4127
std::unique_ptr< QueryMemoryInitializer > query_buffers_
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3232
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:1108
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:1114
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:1115
bool g_bigint_count
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:130
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const