OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Executor Class Reference

#include <Execute.h>

+ Collaboration diagram for Executor:

Classes

class  FetchCacheAnchor
 
struct  GroupColLLVMValue
 
struct  JoinHashTableOrError
 

Public Types

using ExecutorId = size_t
 
using CachedCardinality = std::pair< bool, size_t >
 

Public Member Functions

 Executor (const ExecutorId id, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
 
StringDictionaryProxygetStringDictionaryProxy (const int dictId, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool with_generation) const
 
bool isCPUOnly () const
 
bool isArchMaxwell (const ExecutorDeviceType dt) const
 
bool containsLeftDeepOuterJoin () const
 
const ColumnDescriptorgetColumnDescriptor (const Analyzer::ColumnVar *) const
 
const ColumnDescriptorgetPhysicalColumnDescriptor (const Analyzer::ColumnVar *, int) const
 
const Catalog_Namespace::CataloggetCatalog () const
 
void setCatalog (const Catalog_Namespace::Catalog *catalog)
 
const std::shared_ptr
< RowSetMemoryOwner
getRowSetMemoryOwner () const
 
const TemporaryTablesgetTemporaryTables () const
 
Fragmenter_Namespace::TableInfo getTableInfo (const int table_id) const
 
const TableGenerationgetTableGeneration (const int table_id) const
 
ExpressionRange getColRange (const PhysicalInput &) const
 
size_t getNumBytesForFetchedRow (const std::set< int > &table_ids_to_fetch) const
 
std::vector< ColumnLazyFetchInfogetColLazyFetchInfo (const std::vector< Analyzer::Expr * > &target_exprs) const
 
void registerActiveModule (void *module, const int device_id) const
 
void unregisterActiveModule (void *module, const int device_id) const
 
void interrupt (const std::string &query_session="", const std::string &interrupt_session="")
 
void resetInterrupt ()
 
void enableRuntimeQueryInterrupt (const unsigned interrupt_freq) const
 
int8_t warpSize () const
 
unsigned gridSize () const
 
unsigned numBlocksPerMP () const
 
unsigned blockSize () const
 
size_t maxGpuSlabSize () const
 
void setupCaching (const std::unordered_set< PhysicalInput > &phys_inputs, const std::unordered_set< int > &phys_table_ids)
 
template<typename SESSION_MAP_LOCK >
void setCurrentQuerySession (const std::string &query_session, SESSION_MAP_LOCK &write_lock)
 
template<typename SESSION_MAP_LOCK >
std::string & getCurrentQuerySession (SESSION_MAP_LOCK &read_lock)
 
template<typename SESSION_MAP_LOCK >
bool checkCurrentQuerySession (const std::string &candidate_query_session, SESSION_MAP_LOCK &read_lock)
 
template<typename SESSION_MAP_LOCK >
void invalidateQuerySession (SESSION_MAP_LOCK &write_lock)
 
template<typename SESSION_MAP_LOCK >
bool addToQuerySessionList (const std::string &query_session, SESSION_MAP_LOCK &write_lock)
 
template<typename SESSION_MAP_LOCK >
bool removeFromQuerySessionList (const std::string &query_session, SESSION_MAP_LOCK &write_lock)
 
template<typename SESSION_MAP_LOCK >
void setQuerySessionAsInterrupted (const std::string &query_session, SESSION_MAP_LOCK &write_lock)
 
template<typename SESSION_MAP_LOCK >
bool checkIsQuerySessionInterrupted (const std::string &query_session, SESSION_MAP_LOCK &read_lock)
 
mapd_shared_mutexgetSessionLock ()
 
void addToCardinalityCache (const std::string &cache_key, const size_t cache_value)
 
CachedCardinality getCachedCardinality (const std::string &cache_key)
 
template<typename THREAD_POOL >
void launchKernels (SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels)
 
template<>
void setCurrentQuerySession (const std::string &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
template<>
std::string & getCurrentQuerySession (mapd_shared_lock< mapd_shared_mutex > &read_lock)
 
template<>
bool checkCurrentQuerySession (const std::string &candidate_query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
 
template<>
void invalidateQuerySession (mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
template<>
bool addToQuerySessionList (const std::string &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
template<>
bool removeFromQuerySessionList (const std::string &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
template<>
void setQuerySessionAsInterrupted (const std::string &query_session, mapd_unique_lock< mapd_shared_mutex > &write_lock)
 
template<>
bool checkIsQuerySessionInterrupted (const std::string &query_session, mapd_shared_lock< mapd_shared_mutex > &read_lock)
 

Static Public Member Functions

static std::shared_ptr< ExecutorgetExecutor (const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters system_parameters=SystemParameters())
 
static void nukeCacheOfExecutors ()
 
static void clearMemory (const Data_Namespace::MemoryLevel memory_level)
 
static size_t getArenaBlockSize ()
 
static std::pair< int64_t,
int32_t > 
reduceResults (const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
 
static void addCodeToCache (const CodeCacheKey &, std::shared_ptr< CompilationContext >, llvm::Module *, CodeCache &)
 

Static Public Attributes

static const ExecutorId UNITARY_EXECUTOR_ID = 0
 
static const size_t high_scan_limit
 
static const int32_t ERR_DIV_BY_ZERO {1}
 
static const int32_t ERR_OUT_OF_GPU_MEM {2}
 
static const int32_t ERR_OUT_OF_SLOTS {3}
 
static const int32_t ERR_UNSUPPORTED_SELF_JOIN {4}
 
static const int32_t ERR_OUT_OF_RENDER_MEM {5}
 
static const int32_t ERR_OUT_OF_CPU_MEM {6}
 
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW {7}
 
static const int32_t ERR_OUT_OF_TIME {9}
 
static const int32_t ERR_INTERRUPTED {10}
 
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED {11}
 
static const int32_t ERR_TOO_MANY_LITERALS {12}
 
static const int32_t ERR_STRING_CONST_IN_RESULTSET {13}
 
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY {14}
 
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES {15}
 
static const int32_t ERR_GEOS {16}
 
static std::mutex compilation_mutex_
 
static std::mutex kernel_mutex_
 

Private Types

using PerFragmentCallBack = std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)>
 

Private Member Functions

void clearMetaInfoCache ()
 
int deviceCount (const ExecutorDeviceType) const
 
int deviceCountForMemoryLevel (const Data_Namespace::MemoryLevel memory_level) const
 
llvm::Value * codegenWindowFunction (const size_t target_index, const CompilationOptions &co)
 
llvm::Value * codegenWindowFunctionAggregate (const CompilationOptions &co)
 
llvm::BasicBlock * codegenWindowResetStateControlFlow ()
 
void codegenWindowFunctionStateInit (llvm::Value *aggregate_state)
 
llvm::Value * codegenWindowFunctionAggregateCalls (llvm::Value *aggregate_state, const CompilationOptions &co)
 
void codegenWindowAvgEpilogue (llvm::Value *crt_val, llvm::Value *window_func_null_val, llvm::Value *multiplicity_lv)
 
llvm::Value * codegenAggregateWindowState ()
 
llvm::Value * aggregateWindowStatePtr ()
 
bool isArchPascalOrLater (const ExecutorDeviceType dt) const
 
bool needFetchAllFragments (const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
 
ResultSetPtr executeWorkUnit (size_t &max_groups_buffer_entry_guess, const bool is_agg, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
 
void executeUpdate (const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const UpdateLogForFragment::Callback &cb, const bool is_agg)
 
void executeWorkUnitPerFragment (const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb)
 Compiles and dispatches a work unit per fragment processing results with the per fragment callback. Currently used for computing metrics over fragments (metadata). More...
 
ResultSetPtr executeExplain (const QueryCompilationDescriptor &)
 
ResultSetPtr executeTableFunction (const TableFunctionExecutionUnit exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat)
 Compiles and dispatches a table function; that is, a function that takes as input one or more columns and returns a ResultSet, which can be parsed by subsequent execution steps. More...
 
ExecutorDeviceType getDeviceTypeForTargets (const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
 
ResultSetPtr collectAllDeviceResults (SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
 
ResultSetPtr collectAllDeviceShardedTopResults (SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit) const
 
std::unordered_map< int, const
Analyzer::BinOper * > 
getInnerTabIdToJoinCond () const
 
std::vector< std::unique_ptr
< ExecutionKernel > > 
createKernels (SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
 
template<typename THREAD_POOL >
void launchKernels (SharedKernelContext &shared_context, std::vector< std::unique_ptr< ExecutionKernel >> &&kernels)
 
std::vector< size_t > getTableFragmentIndices (const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type, const size_t table_idx, const size_t outer_frag_idx, std::map< int, const TableFragments * > &selected_tables_fragments, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition)
 
bool skipFragmentPair (const Fragmenter_Namespace::FragmentInfo &outer_fragment_info, const Fragmenter_Namespace::FragmentInfo &inner_fragment_info, const int inner_table_id, const std::unordered_map< int, const Analyzer::BinOper * > &inner_table_id_to_join_condition, const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType device_type)
 
FetchResult fetchChunks (const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator)
 
FetchResult fetchUnionChunks (const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator)
 
std::pair< std::vector
< std::vector< int64_t >
>, std::vector< std::vector
< uint64_t > > > 
getRowCountAndOffsetForAllFrags (const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
 
void buildSelectedFragsMapping (std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
 
void buildSelectedFragsMappingForUnion (std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
 
std::vector< size_t > getFragmentCount (const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
 
int32_t executePlanWithGroupBy (const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
 
int32_t executePlanWithoutGroupBy (const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
 
ResultSetPtr resultsUnion (SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit)
 
std::vector< int64_t > getJoinHashTablePtrs (const ExecutorDeviceType device_type, const int device_id)
 
ResultSetPtr reduceMultiDeviceResults (const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
 
ResultSetPtr reduceMultiDeviceResultSets (std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
 
ResultSetPtr reduceSpeculativeTopN (const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
 
ResultSetPtr executeWorkUnitImpl (size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
 
std::vector< llvm::Value * > inlineHoistedLiterals ()
 
std::tuple< CompilationResult,
std::unique_ptr
< QueryMemoryDescriptor > > 
compileWorkUnit (const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, const CudaMgr_Namespace::CudaMgr *cuda_mgr, const bool allow_lazy_fetch, std::shared_ptr< RowSetMemoryOwner >, const size_t max_groups_buffer_entry_count, const int8_t crt_min_byte_width, const bool has_cardinality_estimation, ColumnCacheMap &column_cache, RenderInfo *render_info=nullptr)
 
llvm::BasicBlock * codegenSkipDeletedOuterTableRow (const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
 
std::vector< JoinLoopbuildJoinLoops (RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, const std::vector< InputTableInfo > &query_infos, ColumnCacheMap &column_cache)
 
std::function< llvm::Value
*(const std::vector
< llvm::Value * >
&, llvm::Value *)> 
buildIsDeletedCb (const RelAlgExecutionUnit &ra_exe_unit, const size_t level_idx, const CompilationOptions &co)
 
std::shared_ptr
< JoinHashTableInterface
buildCurrentLevelHashTable (const JoinCondition &current_level_join_conditions, RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const std::vector< InputTableInfo > &query_infos, ColumnCacheMap &column_cache, std::vector< std::string > &fail_reasons)
 
llvm::Value * addJoinLoopIterator (const std::vector< llvm::Value * > &prev_iters, const size_t level_idx)
 
void codegenJoinLoops (const std::vector< JoinLoop > &join_loops, const RelAlgExecutionUnit &ra_exe_unit, GroupByAndAggregate &group_by_and_aggregate, llvm::Function *query_func, llvm::BasicBlock *entry_bb, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const ExecutionOptions &eo)
 
bool compileBody (const RelAlgExecutionUnit &ra_exe_unit, GroupByAndAggregate &group_by_and_aggregate, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const GpuSharedMemoryContext &gpu_smem_context={})
 
void createErrorCheckControlFlow (llvm::Function *query_func, bool run_with_dynamic_watchdog, bool run_with_allowing_runtime_interrupt, ExecutorDeviceType device_type)
 
void preloadFragOffsets (const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
 
JoinHashTableOrError buildHashTableForQualifier (const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const JoinHashTableInterface::HashType preferred_hash_type, ColumnCacheMap &column_cache)
 
void nukeOldState (const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit *ra_exe_unit)
 
std::shared_ptr
< CompilationContext
optimizeAndCodegenCPU (llvm::Function *, llvm::Function *, const std::unordered_set< llvm::Function * > &, const CompilationOptions &)
 
std::shared_ptr
< CompilationContext
optimizeAndCodegenGPU (llvm::Function *, llvm::Function *, std::unordered_set< llvm::Function * > &, const bool no_inline, const CudaMgr_Namespace::CudaMgr *cuda_mgr, const CompilationOptions &)
 
std::string generatePTX (const std::string &) const
 
void initializeNVPTXBackend () const
 
int64_t deviceCycles (int milliseconds) const
 
GroupColLLVMValue groupByColumnCodegen (Analyzer::Expr *group_by_col, const size_t col_width, const CompilationOptions &, const bool translate_null_val, const int64_t translated_null_val, GroupByAndAggregate::DiamondCodegen &, std::stack< llvm::BasicBlock * > &, const bool thread_mem_shared)
 
llvm::Value * castToFP (llvm::Value *val)
 
llvm::Value * castToIntPtrTyIn (llvm::Value *val, const size_t bit_width)
 
RelAlgExecutionUnit addDeletedColumn (const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
 
std::pair< bool, int64_t > skipFragment (const InputDescriptor &table_desc, const Fragmenter_Namespace::FragmentInfo &frag_info, const std::list< std::shared_ptr< Analyzer::Expr >> &simple_quals, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
 
std::pair< bool, int64_t > skipFragmentInnerJoins (const InputDescriptor &table_desc, const RelAlgExecutionUnit &ra_exe_unit, const Fragmenter_Namespace::FragmentInfo &fragment, const std::vector< uint64_t > &frag_offsets, const size_t frag_idx)
 
AggregatedColRange computeColRangesCache (const std::unordered_set< PhysicalInput > &phys_inputs)
 
StringDictionaryGenerations computeStringDictionaryGenerations (const std::unordered_set< PhysicalInput > &phys_inputs)
 
TableGenerations computeTableGenerations (std::unordered_set< int > phys_table_ids)
 
std::shared_ptr
< CompilationContext
getCodeFromCache (const CodeCacheKey &, const CodeCache &)
 
std::vector< int8_t > serializeLiterals (const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
 
llvm::Value * spillDoubleElement (llvm::Value *elem_val, llvm::Type *elem_ty)
 

Static Private Member Functions

static size_t align (const size_t off_in, const size_t alignment)
 

Private Attributes

std::unique_ptr< CgenStatecgen_state_
 
std::unique_ptr< PlanStateplan_state_
 
std::shared_ptr
< RowSetMemoryOwner
row_set_mem_owner_
 
std::mutex gpu_exec_mutex_ [max_gpu_count]
 
std::shared_ptr
< StringDictionaryProxy
lit_str_dict_proxy_
 
std::mutex str_dict_mutex_
 
std::unique_ptr
< llvm::TargetMachine > 
nvptx_target_machine_
 
CodeCache cpu_code_cache_
 
CodeCache gpu_code_cache_
 
const unsigned block_size_x_
 
const unsigned grid_size_x_
 
const size_t max_gpu_slab_size_
 
const std::string debug_dir_
 
const std::string debug_file_
 
const ExecutorId executor_id_
 
const Catalog_Namespace::Catalogcatalog_
 
const TemporaryTablestemporary_tables_
 
int64_t kernel_queue_time_ms_ = 0
 
int64_t compilation_queue_time_ms_ = 0
 
std::unique_ptr
< WindowProjectNodeContext
window_project_node_context_owned_
 
WindowFunctionContextactive_window_function_ {nullptr}
 
InputTableInfoCache input_table_info_cache_
 
AggregatedColRange agg_col_range_cache_
 
StringDictionaryGenerations string_dictionary_generations_
 
TableGenerations table_generations_
 

Static Private Attributes

static const int max_gpu_count {16}
 
static std::mutex gpu_active_modules_mutex_
 
static uint32_t gpu_active_modules_device_mask_ {0x0}
 
static void * gpu_active_modules_ [max_gpu_count]
 
static std::atomic< bool > interrupted_ {false}
 
static const size_t baseline_threshold
 
static const size_t code_cache_size {1000}
 
static mapd_shared_mutex executor_session_mutex_
 
static std::string current_query_session_ {""}
 
static InterruptFlagMap queries_interrupt_flag_
 
static std::map< int,
std::shared_ptr< Executor > > 
executors_
 
static std::atomic_flag execute_spin_lock_ = ATOMIC_FLAG_INIT
 
static mapd_shared_mutex execute_mutex_
 
static mapd_shared_mutex executors_cache_mutex_
 
static mapd_shared_mutex recycler_mutex_
 
static std::unordered_map
< std::string, size_t > 
cardinality_cache_
 

Friends

class BaselineJoinHashTable
 
class CodeGenerator
 
class ColumnFetcher
 
class ExecutionKernel
 
class OverlapsJoinHashTable
 
class GroupByAndAggregate
 
class QueryCompilationDescriptor
 
class QueryMemoryDescriptor
 
class QueryMemoryInitializer
 
class QueryFragmentDescriptor
 
class QueryExecutionContext
 
class ResultSet
 
class InValuesBitmap
 
class JoinHashTable
 
class LeafAggregator
 
class QueryRewriter
 
class PendingExecutionClosure
 
class RelAlgExecutor
 
class TableOptimizer
 
class TableFunctionCompilationContext
 
class TableFunctionExecutionContext
 
struct TargetExprCodegenBuilder
 
struct TargetExprCodegen
 
class WindowProjectNodeContext
 

Detailed Description

Definition at line 295 of file Execute.h.

Member Typedef Documentation

using Executor::CachedCardinality = std::pair<bool, size_t>

Definition at line 835 of file Execute.h.

using Executor::ExecutorId = size_t

Definition at line 302 of file Execute.h.

Definition at line 454 of file Execute.h.

Constructor & Destructor Documentation

Executor::Executor ( const ExecutorId  id,
const size_t  block_size_x,
const size_t  grid_size_x,
const size_t  max_gpu_slab_size,
const std::string &  debug_dir,
const std::string &  debug_file 
)

Definition at line 129 of file Execute.cpp.

135  : cgen_state_(new CgenState({}, false))
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855

Member Function Documentation

void Executor::addCodeToCache ( const CodeCacheKey key,
std::shared_ptr< CompilationContext compilation_context,
llvm::Module *  module,
CodeCache cache 
)
static

Definition at line 219 of file NativeCodegen.cpp.

References LruCache< key_t, value_t, hash_t >::put().

Referenced by StubGenerator::generateStub().

222  {
223  cache.put(key,
224  std::make_pair<std::shared_ptr<CompilationContext>, decltype(module)>(
225  std::move(compilation_context), std::move(module)));
226 }
std::unique_ptr< llvm::Module > module(runtime_module_shallow_copy(cgen_state))
void put(const key_t &key, value_t &&value)
Definition: LruCache.hpp:27

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RelAlgExecutionUnit Executor::addDeletedColumn ( const RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co 
)
private

Definition at line 3198 of file Execute.cpp.

References catalog_(), CHECK(), CompilationOptions::filter_on_deleted_column, and TABLE.

3199  {
3200  if (!co.filter_on_deleted_column) {
3201  return ra_exe_unit;
3202  }
3203  auto ra_exe_unit_with_deleted = ra_exe_unit;
3204  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
3205  if (input_table.getSourceType() != InputSourceType::TABLE) {
3206  continue;
3207  }
3208  const auto td = catalog_->getMetadataForTable(input_table.getTableId());
3209  CHECK(td);
3210  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
3211  if (!deleted_cd) {
3212  continue;
3213  }
3214  CHECK(deleted_cd->columnType.is_boolean());
3215  // check deleted column is not already present
3216  bool found = false;
3217  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
3218  if (input_col.get()->getColId() == deleted_cd->columnId &&
3219  input_col.get()->getScanDesc().getTableId() == deleted_cd->tableId &&
3220  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
3221  found = true;
3222  }
3223  }
3224  if (!found) {
3225  // add deleted column
3226  ra_exe_unit_with_deleted.input_col_descs.emplace_back(new InputColDescriptor(
3227  deleted_cd->columnId, deleted_cd->tableId, input_table.getNestLevel()));
3228  }
3229  }
3230  return ra_exe_unit_with_deleted;
3231 }
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:900
const ColumnDescriptor * getDeletedColumnIfRowsDeleted(const TableDescriptor *td) const
Definition: Catalog.cpp:2774
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.

+ Here is the call graph for this function:

llvm::Value * Executor::addJoinLoopIterator ( const std::vector< llvm::Value * > &  prev_iters,
const size_t  level_idx 
)
private

Definition at line 530 of file IRCodegen.cpp.

References CodeGenerator::cgen_state_, CHECK(), and CgenState::scan_idx_to_hash_pos_.

531  {
532  // Iterators are added for loop-outer joins when the head of the loop is generated,
533  // then once again when the body if generated. Allow this instead of special handling
534  // of call sites.
535  const auto it = cgen_state_->scan_idx_to_hash_pos_.find(level_idx);
536  if (it != cgen_state_->scan_idx_to_hash_pos_.end()) {
537  return it->second;
538  }
539  CHECK(!prev_iters.empty());
540  llvm::Value* matching_row_index = prev_iters.back();
541  const auto it_ok =
542  cgen_state_->scan_idx_to_hash_pos_.emplace(level_idx, matching_row_index);
543  CHECK(it_ok.second);
544  return matching_row_index;
545 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
CHECK(cgen_state)

+ Here is the call graph for this function:

void Executor::addToCardinalityCache ( const std::string &  cache_key,
const size_t  cache_value 
)

Definition at line 3583 of file Execute.cpp.

References g_use_estimator_result_cache, and VLOG.

3584  {
3586  mapd_unique_lock<mapd_shared_mutex> lock(recycler_mutex_);
3587  cardinality_cache_[cache_key] = cache_value;
3588  VLOG(1) << "Put estimated cardinality to the cache";
3589  }
3590 }
static std::unordered_map< std::string, size_t > cardinality_cache_
Definition: Execute.h:932
bool g_use_estimator_result_cache
Definition: Execute.cpp:109
static mapd_shared_mutex recycler_mutex_
Definition: Execute.h:931
#define VLOG(n)
Definition: Logger.h:291
template<typename SESSION_MAP_LOCK >
bool Executor::addToQuerySessionList ( const std::string &  query_session,
SESSION_MAP_LOCK &  write_lock 
)
template<>
bool Executor::addToQuerySessionList ( const std::string &  query_session,
mapd_unique_lock< mapd_shared_mutex > &  write_lock 
)

Definition at line 3547 of file Execute.cpp.

3548  {
3549  return queries_interrupt_flag_.emplace(query_session, false).second;
3550 }
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:919
llvm::Value * Executor::aggregateWindowStatePtr ( )
private

Definition at line 123 of file WindowFunctionIR.cpp.

References anonymous_namespace{WindowFunctionIR.cpp}::get_adjusted_window_type_info(), get_int_type(), WindowProjectNodeContext::getActiveWindowFunctionContext(), and kFLOAT.

123  {
124  const auto window_func_context =
126  const auto window_func = window_func_context->getWindowFunction();
127  const auto arg_ti = get_adjusted_window_type_info(window_func);
128  llvm::Type* aggregate_state_type =
129  arg_ti.get_type() == kFLOAT
130  ? llvm::PointerType::get(get_int_type(32, cgen_state_->context_), 0)
131  : llvm::PointerType::get(get_int_type(64, cgen_state_->context_), 0);
132  const auto aggregate_state_i64 = cgen_state_->llInt(
133  reinterpret_cast<const int64_t>(window_func_context->aggregateState()));
134  return cgen_state_->ir_builder_.CreateIntToPtr(aggregate_state_i64,
135  aggregate_state_type);
136 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
SQLTypeInfo get_adjusted_window_type_info(const Analyzer::WindowFunction *window_func)

+ Here is the call graph for this function:

static size_t Executor::align ( const size_t  off_in,
const size_t  alignment 
)
inlinestaticprivate

Definition at line 847 of file Execute.h.

847  {
848  size_t off = off_in;
849  if (off % alignment != 0) {
850  off += (alignment - off % alignment);
851  }
852  return off;
853  }
unsigned Executor::blockSize ( ) const

Definition at line 3128 of file Execute.cpp.

References block_size_x_(), catalog_(), and CHECK().

3128  {
3129  CHECK(catalog_);
3130  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3131  CHECK(cuda_mgr);
3132  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3133  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
3134 }
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:207
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:195
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:900
const unsigned block_size_x_
Definition: Execute.h:893

+ Here is the call graph for this function:

std::shared_ptr< JoinHashTableInterface > Executor::buildCurrentLevelHashTable ( const JoinCondition current_level_join_conditions,
RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co,
const std::vector< InputTableInfo > &  query_infos,
ColumnCacheMap column_cache,
std::vector< std::string > &  fail_reasons 
)
private

Definition at line 484 of file IRCodegen.cpp.

References anonymous_namespace{IRCodegen.cpp}::add_qualifier_to_execution_unit(), Data_Namespace::CPU_LEVEL, CompilationOptions::device_type, Executor::JoinHashTableOrError::fail_reason, GPU, Data_Namespace::GPU_LEVEL, Executor::JoinHashTableOrError::hash_table, INNER, IS_EQUIVALENCE, PlanState::join_info_, JoinHashTableInterface::OneToOne, CodeGenerator::plan_state_, JoinCondition::quals, and JoinCondition::type.

490  {
491  if (current_level_join_conditions.type != JoinType::INNER &&
492  current_level_join_conditions.quals.size() > 1) {
493  fail_reasons.emplace_back("No equijoin expression found for outer join");
494  return nullptr;
495  }
496  std::shared_ptr<JoinHashTableInterface> current_level_hash_table;
497  for (const auto& join_qual : current_level_join_conditions.quals) {
498  auto qual_bin_oper = std::dynamic_pointer_cast<Analyzer::BinOper>(join_qual);
499  if (!qual_bin_oper || !IS_EQUIVALENCE(qual_bin_oper->get_optype())) {
500  fail_reasons.emplace_back("No equijoin expression found");
501  if (current_level_join_conditions.type == JoinType::INNER) {
502  add_qualifier_to_execution_unit(ra_exe_unit, join_qual);
503  }
504  continue;
505  }
506  JoinHashTableOrError hash_table_or_error;
507  if (!current_level_hash_table) {
508  hash_table_or_error = buildHashTableForQualifier(
509  qual_bin_oper,
510  query_infos,
514  column_cache);
515  current_level_hash_table = hash_table_or_error.hash_table;
516  }
517  if (hash_table_or_error.hash_table) {
518  plan_state_->join_info_.join_hash_tables_.push_back(hash_table_or_error.hash_table);
519  plan_state_->join_info_.equi_join_tautologies_.push_back(qual_bin_oper);
520  } else {
521  fail_reasons.push_back(hash_table_or_error.fail_reason);
522  if (current_level_join_conditions.type == JoinType::INNER) {
523  add_qualifier_to_execution_unit(ra_exe_unit, qual_bin_oper);
524  }
525  }
526  }
527  return current_level_hash_table;
528 }
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:67
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:870
void add_qualifier_to_execution_unit(RelAlgExecutionUnit &ra_exe_unit, const std::shared_ptr< Analyzer::Expr > &qual)
Definition: IRCodegen.cpp:224
ExecutorDeviceType device_type
std::list< std::shared_ptr< Analyzer::Expr > > quals
JoinHashTableOrError buildHashTableForQualifier(const std::shared_ptr< Analyzer::BinOper > &qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const MemoryLevel memory_level, const JoinHashTableInterface::HashType preferred_hash_type, ColumnCacheMap &column_cache)
Definition: Execute.cpp:3074

+ Here is the call graph for this function:

Executor::JoinHashTableOrError Executor::buildHashTableForQualifier ( const std::shared_ptr< Analyzer::BinOper > &  qual_bin_oper,
const std::vector< InputTableInfo > &  query_infos,
const MemoryLevel  memory_level,
const JoinHashTableInterface::HashType  preferred_hash_type,
ColumnCacheMap column_cache 
)
private

Definition at line 3074 of file Execute.cpp.

References g_enable_dynamic_watchdog, g_enable_overlaps_hashjoin, g_enable_runtime_query_interrupt, and JoinHashTableInterface::getInstance().

3079  {
3080  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
3081  return {nullptr, "Overlaps hash join disabled, attempting to fall back to loop join"};
3082  }
3083  // check whether the interrupt flag turns on (non kernel-time query interrupt)
3085  interrupted_.load()) {
3086  resetInterrupt();
3088  }
3089  try {
3090  auto tbl =
3092  query_infos,
3093  memory_level,
3094  preferred_hash_type,
3095  deviceCountForMemoryLevel(memory_level),
3096  column_cache,
3097  this);
3098  return {tbl, ""};
3099  } catch (const HashJoinFail& e) {
3100  return {nullptr, e.what()};
3101  }
3102 }
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:943
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:75
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:91
static std::shared_ptr< JoinHashTableInterface > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
void resetInterrupt()
static std::atomic< bool > interrupted_
Definition: Execute.h:879
int deviceCountForMemoryLevel(const Data_Namespace::MemoryLevel memory_level) const
Definition: Execute.cpp:625
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:108

+ Here is the call graph for this function:

std::function< llvm::Value *(const std::vector< llvm::Value * > &, llvm::Value *)> Executor::buildIsDeletedCb ( const RelAlgExecutionUnit ra_exe_unit,
const size_t  level_idx,
const CompilationOptions co 
)
private

Definition at line 425 of file IRCodegen.cpp.

References catalog_(), CodeGenerator::cgen_state_, CHECK(), CHECK_LT, CodeGenerator::codegen(), CgenState::context_, CompilationOptions::filter_on_deleted_column, RelAlgExecutionUnit::input_descs, CgenState::ir_builder_, CgenState::llBool(), CgenState::llInt(), CgenState::row_func_, TABLE, and CodeGenerator::toBool().

427  {
428  if (!co.filter_on_deleted_column) {
429  return nullptr;
430  }
431  CHECK_LT(level_idx + 1, ra_exe_unit.input_descs.size());
432  const auto input_desc = ra_exe_unit.input_descs[level_idx + 1];
433  if (input_desc.getSourceType() != InputSourceType::TABLE) {
434  return nullptr;
435  }
436  const auto td = catalog_->getMetadataForTable(input_desc.getTableId());
437  CHECK(td);
438  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
439  if (!deleted_cd) {
440  return nullptr;
441  }
442  CHECK(deleted_cd->columnType.is_boolean());
443  const auto deleted_expr = makeExpr<Analyzer::ColumnVar>(deleted_cd->columnType,
444  input_desc.getTableId(),
445  deleted_cd->columnId,
446  input_desc.getNestLevel());
447  return [this, deleted_expr, level_idx, &co](const std::vector<llvm::Value*>& prev_iters,
448  llvm::Value* have_more_inner_rows) {
449  const auto matching_row_index = addJoinLoopIterator(prev_iters, level_idx + 1);
450  // Avoid fetching the deleted column from a position which is not valid.
451  // An invalid position can be returned by a one to one hash lookup (negative)
452  // or at the end of iteration over a set of matching values.
453  llvm::Value* is_valid_it{nullptr};
454  if (have_more_inner_rows) {
455  is_valid_it = have_more_inner_rows;
456  } else {
457  is_valid_it = cgen_state_->ir_builder_.CreateICmp(
458  llvm::ICmpInst::ICMP_SGE, matching_row_index, cgen_state_->llInt<int64_t>(0));
459  }
460  const auto it_valid_bb = llvm::BasicBlock::Create(
461  cgen_state_->context_, "it_valid", cgen_state_->row_func_);
462  const auto it_not_valid_bb = llvm::BasicBlock::Create(
463  cgen_state_->context_, "it_not_valid", cgen_state_->row_func_);
464  cgen_state_->ir_builder_.CreateCondBr(is_valid_it, it_valid_bb, it_not_valid_bb);
465  const auto row_is_deleted_bb = llvm::BasicBlock::Create(
466  cgen_state_->context_, "row_is_deleted", cgen_state_->row_func_);
467  cgen_state_->ir_builder_.SetInsertPoint(it_valid_bb);
468  CodeGenerator code_generator(this);
469  const auto row_is_deleted = code_generator.toBool(
470  code_generator.codegen(deleted_expr.get(), true, co).front());
471  cgen_state_->ir_builder_.CreateBr(row_is_deleted_bb);
472  cgen_state_->ir_builder_.SetInsertPoint(it_not_valid_bb);
473  const auto row_is_deleted_default = cgen_state_->llBool(false);
474  cgen_state_->ir_builder_.CreateBr(row_is_deleted_bb);
475  cgen_state_->ir_builder_.SetInsertPoint(row_is_deleted_bb);
476  auto row_is_deleted_or_default =
477  cgen_state_->ir_builder_.CreatePHI(row_is_deleted->getType(), 2);
478  row_is_deleted_or_default->addIncoming(row_is_deleted, it_valid_bb);
479  row_is_deleted_or_default->addIncoming(row_is_deleted_default, it_not_valid_bb);
480  return row_is_deleted_or_default;
481  };
482 }
std::vector< InputDescriptor > input_descs
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:900
const ColumnDescriptor * getDeletedColumnIfRowsDeleted(const TableDescriptor *td) const
Definition: Catalog.cpp:2774
#define CHECK_LT(x, y)
Definition: Logger.h:207
llvm::Value * addJoinLoopIterator(const std::vector< llvm::Value * > &prev_iters, const size_t level_idx)
Definition: IRCodegen.cpp:530
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.

+ Here is the call graph for this function:

std::vector< JoinLoop > Executor::buildJoinLoops ( RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co,
const ExecutionOptions eo,
const std::vector< InputTableInfo > &  query_infos,
ColumnCacheMap column_cache 
)
private

Definition at line 256 of file IRCodegen.cpp.

References CodeGenerator::cgen_state_, CHECK(), CHECK_LT, CodeGenerator::codegen(), INJECT_TIMER, CgenState::ir_builder_, RelAlgExecutionUnit::join_quals, LEFT, CgenState::llBool(), JoinHashTableInterface::OneToOne, CgenState::outer_join_match_found_per_level_, Set, Singleton, JoinLoopDomain::slot_lookup_result, CodeGenerator::toBool(), JoinCondition::type, and JoinLoopDomain::values_buffer.

261  {
263  std::vector<JoinLoop> join_loops;
264  for (size_t level_idx = 0, current_hash_table_idx = 0;
265  level_idx < ra_exe_unit.join_quals.size();
266  ++level_idx) {
267  const auto& current_level_join_conditions = ra_exe_unit.join_quals[level_idx];
268  std::vector<std::string> fail_reasons;
269  const auto build_cur_level_hash_table = [&]() {
270  if (current_level_join_conditions.quals.size() > 1) {
271  const auto first_qual = *current_level_join_conditions.quals.begin();
272  auto qual_bin_oper =
273  std::dynamic_pointer_cast<const Analyzer::BinOper>(first_qual);
274  if (qual_bin_oper && qual_bin_oper->is_overlaps_oper() &&
275  current_level_join_conditions.type == JoinType::LEFT) {
276  JoinCondition join_condition{{first_qual}, current_level_join_conditions.type};
277 
279  join_condition, ra_exe_unit, co, query_infos, column_cache, fail_reasons);
280  }
281  }
282  return buildCurrentLevelHashTable(current_level_join_conditions,
283  ra_exe_unit,
284  co,
285  query_infos,
286  column_cache,
287  fail_reasons);
288  };
289  const auto current_level_hash_table = build_cur_level_hash_table();
290  const auto found_outer_join_matches_cb =
291  [this, level_idx](llvm::Value* found_outer_join_matches) {
292  CHECK_LT(level_idx, cgen_state_->outer_join_match_found_per_level_.size());
293  CHECK(!cgen_state_->outer_join_match_found_per_level_[level_idx]);
294  cgen_state_->outer_join_match_found_per_level_[level_idx] =
295  found_outer_join_matches;
296  };
297  const auto is_deleted_cb = buildIsDeletedCb(ra_exe_unit, level_idx, co);
298  const auto outer_join_condition_multi_quals_cb =
299  [this, level_idx, &co, &current_level_join_conditions](
300  const std::vector<llvm::Value*>& prev_iters) {
301  // The values generated for the match path don't dominate all uses
302  // since on the non-match path nulls are generated. Reset the cache
303  // once the condition is generated to avoid incorrect reuse.
304  FetchCacheAnchor anchor(cgen_state_.get());
305  addJoinLoopIterator(prev_iters, level_idx + 1);
306  llvm::Value* left_join_cond = cgen_state_->llBool(true);
307  CodeGenerator code_generator(this);
308  // Do not want to look at all quals! only 1..N quals (ignore first qual)
309  // Note(jclay): this may need to support cases larger than 2
310  // are there any?
311  if (current_level_join_conditions.quals.size() >= 2) {
312  auto qual_it = std::next(current_level_join_conditions.quals.begin(), 1);
313  for (; qual_it != current_level_join_conditions.quals.end();
314  std::advance(qual_it, 1)) {
315  left_join_cond = cgen_state_->ir_builder_.CreateAnd(
316  left_join_cond,
317  code_generator.toBool(
318  code_generator.codegen(qual_it->get(), true, co).front()));
319  }
320  }
321  return left_join_cond;
322  };
323  if (current_level_hash_table) {
324  if (current_level_hash_table->getHashType() == JoinHashTable::HashType::OneToOne) {
325  join_loops.emplace_back(
327  current_level_join_conditions.type,
328  [this, current_hash_table_idx, level_idx, current_level_hash_table, &co](
329  const std::vector<llvm::Value*>& prev_iters) {
330  addJoinLoopIterator(prev_iters, level_idx);
331  JoinLoopDomain domain{{0}};
332  domain.slot_lookup_result =
333  current_level_hash_table->codegenSlot(co, current_hash_table_idx);
334  return domain;
335  },
336  nullptr,
337  current_level_join_conditions.type == JoinType::LEFT
338  ? std::function<void(llvm::Value*)>(found_outer_join_matches_cb)
339  : nullptr,
340  is_deleted_cb);
341  } else {
342  join_loops.emplace_back(
343  /*kind=*/JoinLoopKind::Set,
344  /*type=*/current_level_join_conditions.type,
345  /*iteration_domain_codegen=*/
346  [this, current_hash_table_idx, level_idx, current_level_hash_table, &co](
347  const std::vector<llvm::Value*>& prev_iters) {
348  addJoinLoopIterator(prev_iters, level_idx);
349  JoinLoopDomain domain{{0}};
350  const auto matching_set = current_level_hash_table->codegenMatchingSet(
351  co, current_hash_table_idx);
352  domain.values_buffer = matching_set.elements;
353  domain.element_count = matching_set.count;
354  return domain;
355  },
356  /*outer_condition_match=*/
357  current_level_join_conditions.type == JoinType::LEFT
358  ? std::function<llvm::Value*(const std::vector<llvm::Value*>&)>(
359  outer_join_condition_multi_quals_cb)
360  : nullptr,
361  /*found_outer_matches=*/current_level_join_conditions.type == JoinType::LEFT
362  ? std::function<void(llvm::Value*)>(found_outer_join_matches_cb)
363  : nullptr,
364  /*is_deleted=*/is_deleted_cb);
365  }
366  ++current_hash_table_idx;
367  } else {
368  const auto fail_reasons_str = current_level_join_conditions.quals.empty()
369  ? "No equijoin expression found"
370  : boost::algorithm::join(fail_reasons, " | ");
372  ra_exe_unit, eo, query_infos, level_idx, fail_reasons_str);
373  // Callback provided to the `JoinLoop` framework to evaluate the (outer) join
374  // condition.
375  VLOG(1) << "Unable to build hash table, falling back to loop join: "
376  << fail_reasons_str;
377  const auto outer_join_condition_cb =
378  [this, level_idx, &co, &current_level_join_conditions](
379  const std::vector<llvm::Value*>& prev_iters) {
380  // The values generated for the match path don't dominate all uses
381  // since on the non-match path nulls are generated. Reset the cache
382  // once the condition is generated to avoid incorrect reuse.
383  FetchCacheAnchor anchor(cgen_state_.get());
384  addJoinLoopIterator(prev_iters, level_idx + 1);
385  llvm::Value* left_join_cond = cgen_state_->llBool(true);
386  CodeGenerator code_generator(this);
387  for (auto expr : current_level_join_conditions.quals) {
388  left_join_cond = cgen_state_->ir_builder_.CreateAnd(
389  left_join_cond,
390  code_generator.toBool(
391  code_generator.codegen(expr.get(), true, co).front()));
392  }
393  return left_join_cond;
394  };
395  join_loops.emplace_back(
396  /*kind=*/JoinLoopKind::UpperBound,
397  /*type=*/current_level_join_conditions.type,
398  /*iteration_domain_codegen=*/
399  [this, level_idx](const std::vector<llvm::Value*>& prev_iters) {
400  addJoinLoopIterator(prev_iters, level_idx);
401  JoinLoopDomain domain{{0}};
402  const auto rows_per_scan_ptr = cgen_state_->ir_builder_.CreateGEP(
403  get_arg_by_name(cgen_state_->row_func_, "num_rows_per_scan"),
404  cgen_state_->llInt(int32_t(level_idx + 1)));
405  domain.upper_bound = cgen_state_->ir_builder_.CreateLoad(rows_per_scan_ptr,
406  "num_rows_per_scan");
407  return domain;
408  },
409  /*outer_condition_match=*/
410  current_level_join_conditions.type == JoinType::LEFT
411  ? std::function<llvm::Value*(const std::vector<llvm::Value*>&)>(
412  outer_join_condition_cb)
413  : nullptr,
414  /*found_outer_matches=*/
415  current_level_join_conditions.type == JoinType::LEFT
416  ? std::function<void(llvm::Value*)>(found_outer_join_matches_cb)
417  : nullptr,
418  /*is_deleted=*/is_deleted_cb);
419  }
420  }
421  return join_loops;
422 }
llvm::Value * values_buffer
Definition: JoinLoop.h:47
std::string join(T const &container, std::string const &delim)
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
CHECK(cgen_state)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:95
#define INJECT_TIMER(DESC)
Definition: measure.h:91
const JoinQualsPerNestingLevel join_quals
llvm::Value * slot_lookup_result
Definition: JoinLoop.h:45
#define CHECK_LT(x, y)
Definition: Logger.h:207
llvm::Value * addJoinLoopIterator(const std::vector< llvm::Value * > &prev_iters, const size_t level_idx)
Definition: IRCodegen.cpp:530
void check_if_loop_join_is_allowed(RelAlgExecutionUnit &ra_exe_unit, const ExecutionOptions &eo, const std::vector< InputTableInfo > &query_infos, const size_t level_idx, const std::string &fail_reason)
Definition: IRCodegen.cpp:234
std::vector< JoinLoop > buildJoinLoops(RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, const std::vector< InputTableInfo > &query_infos, ColumnCacheMap &column_cache)
Definition: IRCodegen.cpp:256
std::function< llvm::Value *(const std::vector< llvm::Value * > &, llvm::Value *)> buildIsDeletedCb(const RelAlgExecutionUnit &ra_exe_unit, const size_t level_idx, const CompilationOptions &co)
Definition: IRCodegen.cpp:425
#define VLOG(n)
Definition: Logger.h:291
std::shared_ptr< JoinHashTableInterface > buildCurrentLevelHashTable(const JoinCondition &current_level_join_conditions, RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const std::vector< InputTableInfo > &query_infos, ColumnCacheMap &column_cache, std::vector< std::string > &fail_reasons)
Definition: IRCodegen.cpp:484

+ Here is the call graph for this function:

void Executor::buildSelectedFragsMapping ( std::vector< std::vector< size_t >> &  selected_fragments_crossjoin,
std::vector< size_t > &  local_col_to_frag_pos,
const std::list< std::shared_ptr< const InputColDescriptor >> &  col_global_ids,
const FragmentsList selected_fragments,
const RelAlgExecutionUnit ra_exe_unit 
)
private

Definition at line 2573 of file Execute.cpp.

References CHECK(), CHECK_EQ, CHECK_LT, and RelAlgExecutionUnit::input_descs.

2578  {
2579  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2580  size_t frag_pos{0};
2581  const auto& input_descs = ra_exe_unit.input_descs;
2582  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2583  const int table_id = input_descs[scan_idx].getTableId();
2584  CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2585  selected_fragments_crossjoin.push_back(
2586  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2587  for (const auto& col_id : col_global_ids) {
2588  CHECK(col_id);
2589  const auto& input_desc = col_id->getScanDesc();
2590  if (input_desc.getTableId() != table_id ||
2591  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2592  continue;
2593  }
2594  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2595  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2596  CHECK_LT(static_cast<size_t>(it->second),
2597  plan_state_->global_to_local_col_ids_.size());
2598  local_col_to_frag_pos[it->second] = frag_pos;
2599  }
2600  ++frag_pos;
2601  }
2602 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< InputDescriptor > input_descs
CHECK(cgen_state)
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:870
#define CHECK_LT(x, y)
Definition: Logger.h:207
std::vector< size_t > getFragmentCount(const FragmentsList &selected_fragments, const size_t scan_idx, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2559

+ Here is the call graph for this function:

void Executor::buildSelectedFragsMappingForUnion ( std::vector< std::vector< size_t >> &  selected_fragments_crossjoin,
std::vector< size_t > &  local_col_to_frag_pos,
const std::list< std::shared_ptr< const InputColDescriptor >> &  col_global_ids,
const FragmentsList selected_fragments,
const RelAlgExecutionUnit ra_exe_unit 
)
private

Definition at line 2604 of file Execute.cpp.

References CHECK(), CHECK_LT, and RelAlgExecutionUnit::input_descs.

2609  {
2610  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
2611  size_t frag_pos{0};
2612  const auto& input_descs = ra_exe_unit.input_descs;
2613  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
2614  const int table_id = input_descs[scan_idx].getTableId();
2615  // selected_fragments here is from assignFragsToKernelDispatch
2616  // execution_kernel.fragments
2617  if (selected_fragments[0].table_id != table_id) { // TODO 0
2618  continue;
2619  }
2620  // CHECK_EQ(selected_fragments[scan_idx].table_id, table_id);
2621  selected_fragments_crossjoin.push_back(
2622  // getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
2623  {size_t(1)}); // TODO
2624  for (const auto& col_id : col_global_ids) {
2625  CHECK(col_id);
2626  const auto& input_desc = col_id->getScanDesc();
2627  if (input_desc.getTableId() != table_id ||
2628  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
2629  continue;
2630  }
2631  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2632  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2633  CHECK_LT(static_cast<size_t>(it->second),
2634  plan_state_->global_to_local_col_ids_.size());
2635  local_col_to_frag_pos[it->second] = frag_pos;
2636  }
2637  ++frag_pos;
2638  }
2639 }
std::vector< InputDescriptor > input_descs
CHECK(cgen_state)
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:870
#define CHECK_LT(x, y)
Definition: Logger.h:207

+ Here is the call graph for this function:

llvm::Value * Executor::castToFP ( llvm::Value *  val)
private

Definition at line 3148 of file Execute.cpp.

References logger::FATAL, LOG, and to_string().

3148  {
3149  if (!val->getType()->isIntegerTy()) {
3150  return val;
3151  }
3152 
3153  auto val_width = static_cast<llvm::IntegerType*>(val->getType())->getBitWidth();
3154  llvm::Type* dest_ty{nullptr};
3155  switch (val_width) {
3156  case 32:
3157  dest_ty = llvm::Type::getFloatTy(cgen_state_->context_);
3158  break;
3159  case 64:
3160  dest_ty = llvm::Type::getDoubleTy(cgen_state_->context_);
3161  break;
3162  default:
3163  LOG(FATAL) << "Unsupported FP width: " << std::to_string(val_width);
3164  }
3165  return cgen_state_->ir_builder_.CreateSIToFP(val, dest_ty);
3166 }
#define LOG(tag)
Definition: Logger.h:188
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
std::string to_string(char const *&&v)

+ Here is the call graph for this function:

llvm::Value * Executor::castToIntPtrTyIn ( llvm::Value *  val,
const size_t  bit_width 
)
private

Definition at line 3168 of file Execute.cpp.

References CHECK(), CHECK_LT, and get_int_type().

3168  {
3169  CHECK(val->getType()->isPointerTy());
3170 
3171  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
3172  const auto val_type = val_ptr_type->getElementType();
3173  size_t val_width = 0;
3174  if (val_type->isIntegerTy()) {
3175  val_width = val_type->getIntegerBitWidth();
3176  } else {
3177  if (val_type->isFloatTy()) {
3178  val_width = 32;
3179  } else {
3180  CHECK(val_type->isDoubleTy());
3181  val_width = 64;
3182  }
3183  }
3184  CHECK_LT(size_t(0), val_width);
3185  if (bitWidth == val_width) {
3186  return val;
3187  }
3188  return cgen_state_->ir_builder_.CreateBitCast(
3189  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
3190 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
CHECK(cgen_state)
#define CHECK_LT(x, y)
Definition: Logger.h:207

+ Here is the call graph for this function:

template<typename SESSION_MAP_LOCK >
bool Executor::checkCurrentQuerySession ( const std::string &  candidate_query_session,
SESSION_MAP_LOCK &  read_lock 
)
template<>
bool Executor::checkCurrentQuerySession ( const std::string &  candidate_query_session,
mapd_shared_lock< mapd_shared_mutex > &  read_lock 
)

Definition at line 3534 of file Execute.cpp.

3535  {
3536  // if current_query_session is equal to the candidate_query_session,
3537  // or it is empty session we consider
3538  return (current_query_session_ == candidate_query_session);
3539 }
static std::string current_query_session_
Definition: Execute.h:917
template<typename SESSION_MAP_LOCK >
bool Executor::checkIsQuerySessionInterrupted ( const std::string &  query_session,
SESSION_MAP_LOCK &  read_lock 
)
template<>
bool Executor::checkIsQuerySessionInterrupted ( const std::string &  query_session,
mapd_shared_lock< mapd_shared_mutex > &  read_lock 
)

Definition at line 3567 of file Execute.cpp.

3569  {
3570  auto flag_it = queries_interrupt_flag_.find(query_session);
3571  return flag_it != queries_interrupt_flag_.end() && flag_it->second;
3572 }
static InterruptFlagMap queries_interrupt_flag_
Definition: Execute.h:919
void Executor::clearMemory ( const Data_Namespace::MemoryLevel  memory_level)
static

Definition at line 170 of file Execute.cpp.

References Data_Namespace::DataMgr::clearMemory(), Data_Namespace::CPU_LEVEL, Catalog_Namespace::SysCatalog::getDataMgr(), Data_Namespace::GPU_LEVEL, Catalog_Namespace::SysCatalog::instance(), and CacheInvalidator< CACHE_HOLDING_TYPES >::invalidateCaches().

Referenced by DBHandler::clear_cpu_memory(), DBHandler::clear_gpu_memory(), QueryRunner::QueryRunner::clearCpuMemory(), and QueryRunner::QueryRunner::clearGpuMemory().

170  {
171  switch (memory_level) {
174  mapd_unique_lock<mapd_shared_mutex> flush_lock(
175  execute_mutex_); // Don't flush memory while queries are running
176 
178  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
179  // The hash table cache uses CPU memory not managed by the buffer manager. In the
180  // future, we should manage these allocations with the buffer manager directly.
181  // For now, assume the user wants to purge the hash table cache when they clear
182  // CPU memory (currently used in ExecuteTest to lower memory pressure)
184  }
185  break;
186  }
187  default: {
188  throw std::runtime_error(
189  "Clearing memory levels other than the CPU level or GPU level is not "
190  "supported.");
191  }
192  }
193 }
static mapd_shared_mutex execute_mutex_
Definition: Execute.h:926
void clearMemory(const MemoryLevel memLevel)
Definition: DataMgr.cpp:370
static void invalidateCaches()
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:189
static SysCatalog & instance()
Definition: SysCatalog.h:288

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Executor::clearMetaInfoCache ( )
private

Definition at line 346 of file Execute.cpp.

References input_table_info_cache_().

346  {
351 }
AggregatedColRange agg_col_range_cache_
Definition: Execute.h:913
StringDictionaryGenerations string_dictionary_generations_
Definition: Execute.h:914
InputTableInfoCache input_table_info_cache_
Definition: Execute.h:912
TableGenerations table_generations_
Definition: Execute.h:915

+ Here is the call graph for this function:

llvm::Value * Executor::codegenAggregateWindowState ( )
private

Definition at line 319 of file WindowFunctionIR.cpp.

References AVG, COUNT, anonymous_namespace{WindowFunctionIR.cpp}::get_adjusted_window_type_info(), get_int_type(), WindowProjectNodeContext::getActiveWindowFunctionContext(), Analyzer::WindowFunction::getKind(), kDECIMAL, kDOUBLE, and kFLOAT.

319  {
320  const auto pi32_type =
321  llvm::PointerType::get(get_int_type(32, cgen_state_->context_), 0);
322  const auto pi64_type =
323  llvm::PointerType::get(get_int_type(64, cgen_state_->context_), 0);
324  const auto window_func_context =
326  const Analyzer::WindowFunction* window_func = window_func_context->getWindowFunction();
327  const auto window_func_ti = get_adjusted_window_type_info(window_func);
328  const auto aggregate_state_type =
329  window_func_ti.get_type() == kFLOAT ? pi32_type : pi64_type;
330  auto aggregate_state = aggregateWindowStatePtr();
331  if (window_func->getKind() == SqlWindowFunctionKind::AVG) {
332  const auto aggregate_state_count_i64 = cgen_state_->llInt(
333  reinterpret_cast<const int64_t>(window_func_context->aggregateStateCount()));
334  auto aggregate_state_count = cgen_state_->ir_builder_.CreateIntToPtr(
335  aggregate_state_count_i64, aggregate_state_type);
336  const auto double_null_lv = cgen_state_->inlineFpNull(SQLTypeInfo(kDOUBLE));
337  switch (window_func_ti.get_type()) {
338  case kFLOAT: {
339  return cgen_state_->emitCall(
340  "load_avg_float", {aggregate_state, aggregate_state_count, double_null_lv});
341  }
342  case kDOUBLE: {
343  return cgen_state_->emitCall(
344  "load_avg_double", {aggregate_state, aggregate_state_count, double_null_lv});
345  }
346  case kDECIMAL: {
347  return cgen_state_->emitCall(
348  "load_avg_decimal",
349  {aggregate_state,
350  aggregate_state_count,
351  double_null_lv,
352  cgen_state_->llInt<int32_t>(window_func_ti.get_scale())});
353  }
354  default: {
355  return cgen_state_->emitCall(
356  "load_avg_int", {aggregate_state, aggregate_state_count, double_null_lv});
357  }
358  }
359  }
360  if (window_func->getKind() == SqlWindowFunctionKind::COUNT) {
361  return cgen_state_->ir_builder_.CreateLoad(aggregate_state);
362  }
363  switch (window_func_ti.get_type()) {
364  case kFLOAT: {
365  return cgen_state_->emitCall("load_float", {aggregate_state});
366  }
367  case kDOUBLE: {
368  return cgen_state_->emitCall("load_double", {aggregate_state});
369  }
370  default: {
371  return cgen_state_->ir_builder_.CreateLoad(aggregate_state);
372  }
373  }
374 }
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:1448
llvm::Value * aggregateWindowStatePtr()
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
SQLTypeInfo get_adjusted_window_type_info(const Analyzer::WindowFunction *window_func)

+ Here is the call graph for this function:

void Executor::codegenJoinLoops ( const std::vector< JoinLoop > &  join_loops,
const RelAlgExecutionUnit ra_exe_unit,
GroupByAndAggregate group_by_and_aggregate,
llvm::Function *  query_func,
llvm::BasicBlock *  entry_bb,
const QueryMemoryDescriptor query_mem_desc,
const CompilationOptions co,
const ExecutionOptions eo 
)
private

Definition at line 547 of file IRCodegen.cpp.

References ExecutionOptions::allow_runtime_query_interrupt, CodeGenerator::cgen_state_, JoinLoop::codegen(), CgenState::context_, CompilationOptions::device_type, CgenState::ir_builder_, CgenState::llInt(), CgenState::needs_error_check_, CodeGenerator::posArg(), CgenState::row_func_, and ExecutionOptions::with_dynamic_watchdog.

554  {
555  const auto exit_bb =
556  llvm::BasicBlock::Create(cgen_state_->context_, "exit", cgen_state_->row_func_);
557  cgen_state_->ir_builder_.SetInsertPoint(exit_bb);
558  cgen_state_->ir_builder_.CreateRet(cgen_state_->llInt<int32_t>(0));
559  cgen_state_->ir_builder_.SetInsertPoint(entry_bb);
560  CodeGenerator code_generator(this);
561  const auto loops_entry_bb = JoinLoop::codegen(
562  join_loops,
563  [this,
564  query_func,
565  &query_mem_desc,
566  &co,
567  &eo,
568  &group_by_and_aggregate,
569  &join_loops,
570  &ra_exe_unit](const std::vector<llvm::Value*>& prev_iters) {
571  addJoinLoopIterator(prev_iters, join_loops.size());
572  auto& builder = cgen_state_->ir_builder_;
573  const auto loop_body_bb = llvm::BasicBlock::Create(
574  builder.getContext(), "loop_body", builder.GetInsertBlock()->getParent());
575  builder.SetInsertPoint(loop_body_bb);
576  const bool can_return_error =
577  compileBody(ra_exe_unit, group_by_and_aggregate, query_mem_desc, co);
578  if (can_return_error || cgen_state_->needs_error_check_ ||
580  createErrorCheckControlFlow(query_func,
583  co.device_type);
584  }
585  return loop_body_bb;
586  },
587  code_generator.posArg(nullptr),
588  exit_bb,
589  cgen_state_->ir_builder_);
590  cgen_state_->ir_builder_.SetInsertPoint(entry_bb);
591  cgen_state_->ir_builder_.CreateBr(loops_entry_bb);
592 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
bool compileBody(const RelAlgExecutionUnit &ra_exe_unit, GroupByAndAggregate &group_by_and_aggregate, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const GpuSharedMemoryContext &gpu_smem_context={})
const bool with_dynamic_watchdog
ExecutorDeviceType device_type
void createErrorCheckControlFlow(llvm::Function *query_func, bool run_with_dynamic_watchdog, bool run_with_allowing_runtime_interrupt, ExecutorDeviceType device_type)
static llvm::BasicBlock * codegen(const std::vector< JoinLoop > &join_loops, const std::function< llvm::BasicBlock *(const std::vector< llvm::Value * > &)> &body_codegen, llvm::Value *outer_iter, llvm::BasicBlock *exit_bb, llvm::IRBuilder<> &builder)
Definition: JoinLoop.cpp:45
llvm::Value * addJoinLoopIterator(const std::vector< llvm::Value * > &prev_iters, const size_t level_idx)
Definition: IRCodegen.cpp:530
const bool allow_runtime_query_interrupt

+ Here is the call graph for this function:

llvm::BasicBlock * Executor::codegenSkipDeletedOuterTableRow ( const RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co 
)
private

Definition at line 2170 of file NativeCodegen.cpp.

2172  {
2173  if (!co.filter_on_deleted_column) {
2174  return nullptr;
2175  }
2176  CHECK(!ra_exe_unit.input_descs.empty());
2177  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
2178  if (outer_input_desc.getSourceType() != InputSourceType::TABLE) {
2179  return nullptr;
2180  }
2181  const auto td = catalog_->getMetadataForTable(outer_input_desc.getTableId());
2182  CHECK(td);
2183  const auto deleted_cd = catalog_->getDeletedColumnIfRowsDeleted(td);
2184  if (!deleted_cd) {
2185  return nullptr;
2186  }
2187  CHECK(deleted_cd->columnType.is_boolean());
2188  const auto deleted_expr =
2189  makeExpr<Analyzer::ColumnVar>(deleted_cd->columnType,
2190  outer_input_desc.getTableId(),
2191  deleted_cd->columnId,
2192  outer_input_desc.getNestLevel());
2193  CodeGenerator code_generator(this);
2194  const auto is_deleted =
2195  code_generator.toBool(code_generator.codegen(deleted_expr.get(), true, co).front());
2196  const auto is_deleted_bb = llvm::BasicBlock::Create(
2197  cgen_state_->context_, "is_deleted", cgen_state_->row_func_);
2198  llvm::BasicBlock* bb = llvm::BasicBlock::Create(
2199  cgen_state_->context_, "is_not_deleted", cgen_state_->row_func_);
2200  cgen_state_->ir_builder_.CreateCondBr(is_deleted, is_deleted_bb, bb);
2201  cgen_state_->ir_builder_.SetInsertPoint(is_deleted_bb);
2202  cgen_state_->ir_builder_.CreateRet(cgen_state_->llInt<int32_t>(0));
2203  cgen_state_->ir_builder_.SetInsertPoint(bb);
2204  return bb;
2205 }
std::vector< InputDescriptor > input_descs
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:900
const ColumnDescriptor * getDeletedColumnIfRowsDeleted(const TableDescriptor *td) const
Definition: Catalog.cpp:2774
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
void Executor::codegenWindowAvgEpilogue ( llvm::Value *  crt_val,
llvm::Value *  window_func_null_val,
llvm::Value *  multiplicity_lv 
)
private

Definition at line 283 of file WindowFunctionIR.cpp.

References anonymous_namespace{WindowFunctionIR.cpp}::get_adjusted_window_type_info(), get_int_type(), WindowProjectNodeContext::getActiveWindowFunctionContext(), kDOUBLE, and kFLOAT.

285  {
286  const auto window_func_context =
288  const auto window_func = window_func_context->getWindowFunction();
289  const auto window_func_ti = get_adjusted_window_type_info(window_func);
290  const auto pi32_type =
291  llvm::PointerType::get(get_int_type(32, cgen_state_->context_), 0);
292  const auto pi64_type =
293  llvm::PointerType::get(get_int_type(64, cgen_state_->context_), 0);
294  const auto aggregate_state_type =
295  window_func_ti.get_type() == kFLOAT ? pi32_type : pi64_type;
296  const auto aggregate_state_count_i64 = cgen_state_->llInt(
297  reinterpret_cast<const int64_t>(window_func_context->aggregateStateCount()));
298  auto aggregate_state_count = cgen_state_->ir_builder_.CreateIntToPtr(
299  aggregate_state_count_i64, aggregate_state_type);
300  std::string agg_count_func_name = "agg_count";
301  switch (window_func_ti.get_type()) {
302  case kFLOAT: {
303  agg_count_func_name += "_float";
304  break;
305  }
306  case kDOUBLE: {
307  agg_count_func_name += "_double";
308  break;
309  }
310  default: {
311  break;
312  }
313  }
314  agg_count_func_name += "_skip_val";
315  cgen_state_->emitCall(agg_count_func_name,
316  {aggregate_state_count, crt_val, window_func_null_val});
317 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
SQLTypeInfo get_adjusted_window_type_info(const Analyzer::WindowFunction *window_func)

+ Here is the call graph for this function:

llvm::Value * Executor::codegenWindowFunction ( const size_t  target_index,
const CompilationOptions co 
)
private

Definition at line 21 of file WindowFunctionIR.cpp.

References WindowProjectNodeContext::activateWindowFunctionContext(), run_benchmark_import::args, AVG, CHECK(), CHECK_EQ, CodeGenerator::codegen(), COUNT, CUME_DIST, DENSE_RANK, logger::FATAL, FIRST_VALUE, WindowProjectNodeContext::get(), WindowFunctionContext::getWindowFunction(), LAG, LAST_VALUE, LEAD, LOG, MAX, MIN, NTILE, PERCENT_RANK, CodeGenerator::posArg(), RANK, ROW_NUMBER, and SUM.

22  {
23  CodeGenerator code_generator(this);
24  const auto window_func_context =
26  target_index);
27  const auto window_func = window_func_context->getWindowFunction();
28  switch (window_func->getKind()) {
33  return cgen_state_->emitCall("row_number_window_func",
34  {cgen_state_->llInt(reinterpret_cast<const int64_t>(
35  window_func_context->output())),
36  code_generator.posArg(nullptr)});
37  }
40  return cgen_state_->emitCall("percent_window_func",
41  {cgen_state_->llInt(reinterpret_cast<const int64_t>(
42  window_func_context->output())),
43  code_generator.posArg(nullptr)});
44  }
50  const auto& args = window_func->getArgs();
51  CHECK(!args.empty());
52  const auto arg_lvs = code_generator.codegen(args.front().get(), true, co);
53  CHECK_EQ(arg_lvs.size(), size_t(1));
54  return arg_lvs.front();
55  }
62  }
63  default: {
64  LOG(FATAL) << "Invalid window function kind";
65  }
66  }
67  return nullptr;
68 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
#define LOG(tag)
Definition: Logger.h:188
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
static const WindowProjectNodeContext * get(Executor *executor)
CHECK(cgen_state)
const WindowFunctionContext * activateWindowFunctionContext(Executor *executor, const size_t target_index) const
llvm::Value * codegenWindowFunctionAggregate(const CompilationOptions &co)
const Analyzer::WindowFunction * getWindowFunction() const

+ Here is the call graph for this function:

llvm::Value * Executor::codegenWindowFunctionAggregate ( const CompilationOptions co)
private

Definition at line 138 of file WindowFunctionIR.cpp.

References AVG, CHECK(), WindowProjectNodeContext::get(), get_int_type(), and WindowProjectNodeContext::getActiveWindowFunctionContext().

138  {
139  const auto reset_state_false_bb = codegenWindowResetStateControlFlow();
140  auto aggregate_state = aggregateWindowStatePtr();
141  llvm::Value* aggregate_state_count = nullptr;
142  const auto window_func_context =
144  const auto window_func = window_func_context->getWindowFunction();
145  if (window_func->getKind() == SqlWindowFunctionKind::AVG) {
146  const auto aggregate_state_count_i64 = cgen_state_->llInt(
147  reinterpret_cast<const int64_t>(window_func_context->aggregateStateCount()));
148  const auto pi64_type =
149  llvm::PointerType::get(get_int_type(64, cgen_state_->context_), 0);
150  aggregate_state_count =
151  cgen_state_->ir_builder_.CreateIntToPtr(aggregate_state_count_i64, pi64_type);
152  }
153  codegenWindowFunctionStateInit(aggregate_state);
154  if (window_func->getKind() == SqlWindowFunctionKind::AVG) {
155  const auto count_zero = cgen_state_->llInt(int64_t(0));
156  cgen_state_->emitCall("agg_id", {aggregate_state_count, count_zero});
157  }
158  cgen_state_->ir_builder_.CreateBr(reset_state_false_bb);
159  cgen_state_->ir_builder_.SetInsertPoint(reset_state_false_bb);
161  return codegenWindowFunctionAggregateCalls(aggregate_state, co);
162 }
llvm::Value * aggregateWindowStatePtr()
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
static const WindowProjectNodeContext * get(Executor *executor)
CHECK(cgen_state)
void codegenWindowFunctionStateInit(llvm::Value *aggregate_state)
llvm::Value * codegenWindowFunctionAggregateCalls(llvm::Value *aggregate_state, const CompilationOptions &co)
llvm::BasicBlock * codegenWindowResetStateControlFlow()

+ Here is the call graph for this function:

llvm::Value * Executor::codegenWindowFunctionAggregateCalls ( llvm::Value *  aggregate_state,
const CompilationOptions co 
)
private

Definition at line 241 of file WindowFunctionIR.cpp.

References run_benchmark_import::args, AVG, CHECK(), CHECK_EQ, CodeGenerator::codegen(), CodeGenerator::codegenCastBetweenIntTypes(), COUNT, anonymous_namespace{WindowFunctionIR.cpp}::get_adjusted_window_type_info(), anonymous_namespace{WindowFunctionIR.cpp}::get_window_agg_name(), WindowProjectNodeContext::getActiveWindowFunctionContext(), kFLOAT, and SUM.

242  {
243  const auto window_func_context =
245  const auto window_func = window_func_context->getWindowFunction();
246  const auto window_func_ti = get_adjusted_window_type_info(window_func);
247  const auto window_func_null_val =
248  window_func_ti.is_fp()
249  ? cgen_state_->inlineFpNull(window_func_ti)
250  : cgen_state_->castToTypeIn(cgen_state_->inlineIntNull(window_func_ti), 64);
251  const auto& args = window_func->getArgs();
252  llvm::Value* crt_val;
253  if (args.empty()) {
254  CHECK(window_func->getKind() == SqlWindowFunctionKind::COUNT);
255  crt_val = cgen_state_->llInt(int64_t(1));
256  } else {
257  CodeGenerator code_generator(this);
258  const auto arg_lvs = code_generator.codegen(args.front().get(), true, co);
259  CHECK_EQ(arg_lvs.size(), size_t(1));
260  if (window_func->getKind() == SqlWindowFunctionKind::SUM && !window_func_ti.is_fp()) {
261  crt_val = code_generator.codegenCastBetweenIntTypes(
262  arg_lvs.front(), args.front()->get_type_info(), window_func_ti, false);
263  } else {
264  crt_val = window_func_ti.get_type() == kFLOAT
265  ? arg_lvs.front()
266  : cgen_state_->castToTypeIn(arg_lvs.front(), 64);
267  }
268  }
269  const auto agg_name = get_window_agg_name(window_func->getKind(), window_func_ti);
270  llvm::Value* multiplicity_lv = nullptr;
271  if (args.empty()) {
272  cgen_state_->emitCall(agg_name, {aggregate_state, crt_val});
273  } else {
274  cgen_state_->emitCall(agg_name + "_skip_val",
275  {aggregate_state, crt_val, window_func_null_val});
276  }
277  if (window_func->getKind() == SqlWindowFunctionKind::AVG) {
278  codegenWindowAvgEpilogue(crt_val, window_func_null_val, multiplicity_lv);
279  }
281 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
std::string get_window_agg_name(const SqlWindowFunctionKind kind, const SQLTypeInfo &window_func_ti)
void codegenWindowAvgEpilogue(llvm::Value *crt_val, llvm::Value *window_func_null_val, llvm::Value *multiplicity_lv)
CHECK(cgen_state)
llvm::Value * codegenAggregateWindowState()
SQLTypeInfo get_adjusted_window_type_info(const Analyzer::WindowFunction *window_func)

+ Here is the call graph for this function:

void Executor::codegenWindowFunctionStateInit ( llvm::Value *  aggregate_state)
private

Definition at line 192 of file WindowFunctionIR.cpp.

References COUNT, anonymous_namespace{WindowFunctionIR.cpp}::get_adjusted_window_type_info(), get_int_type(), WindowProjectNodeContext::getActiveWindowFunctionContext(), kDOUBLE, and kFLOAT.

192  {
193  const auto window_func_context =
195  const auto window_func = window_func_context->getWindowFunction();
196  const auto window_func_ti = get_adjusted_window_type_info(window_func);
197  const auto window_func_null_val =
198  window_func_ti.is_fp()
199  ? cgen_state_->inlineFpNull(window_func_ti)
200  : cgen_state_->castToTypeIn(cgen_state_->inlineIntNull(window_func_ti), 64);
201  llvm::Value* window_func_init_val;
202  if (window_func_context->getWindowFunction()->getKind() ==
204  switch (window_func_ti.get_type()) {
205  case kFLOAT: {
206  window_func_init_val = cgen_state_->llFp(float(0));
207  break;
208  }
209  case kDOUBLE: {
210  window_func_init_val = cgen_state_->llFp(double(0));
211  break;
212  }
213  default: {
214  window_func_init_val = cgen_state_->llInt(int64_t(0));
215  break;
216  }
217  }
218  } else {
219  window_func_init_val = window_func_null_val;
220  }
221  const auto pi32_type =
222  llvm::PointerType::get(get_int_type(32, cgen_state_->context_), 0);
223  switch (window_func_ti.get_type()) {
224  case kDOUBLE: {
225  cgen_state_->emitCall("agg_id_double", {aggregate_state, window_func_init_val});
226  break;
227  }
228  case kFLOAT: {
229  aggregate_state =
230  cgen_state_->ir_builder_.CreateBitCast(aggregate_state, pi32_type);
231  cgen_state_->emitCall("agg_id_float", {aggregate_state, window_func_init_val});
232  break;
233  }
234  default: {
235  cgen_state_->emitCall("agg_id", {aggregate_state, window_func_init_val});
236  break;
237  }
238  }
239 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
SQLTypeInfo get_adjusted_window_type_info(const Analyzer::WindowFunction *window_func)

+ Here is the call graph for this function:

llvm::BasicBlock * Executor::codegenWindowResetStateControlFlow ( )
private

Definition at line 164 of file WindowFunctionIR.cpp.

References WindowProjectNodeContext::getActiveWindowFunctionContext(), CodeGenerator::posArg(), and CodeGenerator::toBool().

164  {
165  const auto window_func_context =
167  const auto bitset = cgen_state_->llInt(
168  reinterpret_cast<const int64_t>(window_func_context->partitionStart()));
169  const auto min_val = cgen_state_->llInt(int64_t(0));
170  const auto max_val = cgen_state_->llInt(window_func_context->elementCount() - 1);
171  const auto null_val = cgen_state_->llInt(inline_int_null_value<int64_t>());
172  const auto null_bool_val = cgen_state_->llInt<int8_t>(inline_int_null_value<int8_t>());
173  CodeGenerator code_generator(this);
174  const auto reset_state =
175  code_generator.toBool(cgen_state_->emitCall("bit_is_set",
176  {bitset,
177  code_generator.posArg(nullptr),
178  min_val,
179  max_val,
180  null_val,
181  null_bool_val}));
182  const auto reset_state_true_bb = llvm::BasicBlock::Create(
183  cgen_state_->context_, "reset_state.true", cgen_state_->row_func_);
184  const auto reset_state_false_bb = llvm::BasicBlock::Create(
185  cgen_state_->context_, "reset_state.false", cgen_state_->row_func_);
186  cgen_state_->ir_builder_.CreateCondBr(
187  reset_state, reset_state_true_bb, reset_state_false_bb);
188  cgen_state_->ir_builder_.SetInsertPoint(reset_state_true_bb);
189  return reset_state_false_bb;
190 }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)

+ Here is the call graph for this function:

ResultSetPtr Executor::collectAllDeviceResults ( SharedKernelContext shared_context,
const RelAlgExecutionUnit ra_exe_unit,
const QueryMemoryDescriptor query_mem_desc,
const ExecutorDeviceType  device_type,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner 
)
private

Definition at line 1755 of file Execute.cpp.

References anonymous_namespace{Execute.cpp}::build_row_for_empty_input(), catalog_(), DEBUG_TIMER, SharedKernelContext::getFragmentResults(), QueryMemoryDescriptor::getQueryDescriptionType(), GPU, NonGroupedAggregate, GroupByAndAggregate::shard_count_for_top_groups(), RelAlgExecutionUnit::target_exprs, and use_speculative_top_n().

1760  {
1761  auto timer = DEBUG_TIMER(__func__);
1762  auto& result_per_device = shared_context.getFragmentResults();
1763  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
1766  ra_exe_unit.target_exprs, query_mem_desc, device_type);
1767  }
1768  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
1769  try {
1770  return reduceSpeculativeTopN(
1771  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1772  } catch (const std::bad_alloc&) {
1773  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
1774  }
1775  }
1776  const auto shard_count =
1777  device_type == ExecutorDeviceType::GPU
1779  : 0;
1780 
1781  if (shard_count && !result_per_device.empty()) {
1782  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit);
1783  }
1784  return reduceMultiDeviceResults(
1785  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
1786 }
std::vector< Analyzer::Expr * > target_exprs
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
ResultSetPtr reduceSpeculativeTopN(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:968
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:900
ResultSetPtr reduceMultiDeviceResults(const RelAlgExecutionUnit &, std::vector< std::pair< ResultSetPtr, std::vector< size_t >>> &all_fragment_results, std::shared_ptr< RowSetMemoryOwner >, const QueryMemoryDescriptor &) const
Definition: Execute.cpp:868
ResultSetPtr collectAllDeviceShardedTopResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit) const
Definition: Execute.cpp:1870
QueryDescriptionType getQueryDescriptionType() const
ResultSetPtr build_row_for_empty_input(const std::vector< Analyzer::Expr * > &target_exprs_in, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type)
Definition: Execute.cpp:1718
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
#define DEBUG_TIMER(name)
Definition: Logger.h:313
static size_t shard_count_for_top_groups(const RelAlgExecutionUnit &ra_exe_unit, const Catalog_Namespace::Catalog &catalog)

+ Here is the call graph for this function:

ResultSetPtr Executor::collectAllDeviceShardedTopResults ( SharedKernelContext shared_context,
const RelAlgExecutionUnit ra_exe_unit 
) const
private

Definition at line 1870 of file Execute.cpp.

References CHECK(), CHECK_EQ, CHECK_LE, SharedKernelContext::getFragmentResults(), SortInfo::limit, SortInfo::offset, SortInfo::order_entries, anonymous_namespace{Execute.cpp}::permute_storage_columnar(), anonymous_namespace{Execute.cpp}::permute_storage_row_wise(), run_benchmark_import::result, and RelAlgExecutionUnit::sort_info.

1872  {
1873  auto& result_per_device = shared_context.getFragmentResults();
1874  const auto first_result_set = result_per_device.front().first;
1875  CHECK(first_result_set);
1876  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
1877  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
1878  const auto top_n = ra_exe_unit.sort_info.limit + ra_exe_unit.sort_info.offset;
1879  top_query_mem_desc.setEntryCount(0);
1880  for (auto& result : result_per_device) {
1881  const auto result_set = result.first;
1882  CHECK(result_set);
1883  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n);
1884  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
1885  top_query_mem_desc.setEntryCount(new_entry_cnt);
1886  }
1887  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
1888  first_result_set->getDeviceType(),
1889  top_query_mem_desc,
1890  first_result_set->getRowSetMemOwner(),
1891  this);
1892  auto top_storage = top_result_set->allocateStorage();
1893  size_t top_output_row_idx{0};
1894  for (auto& result : result_per_device) {
1895  const auto result_set = result.first;
1896  CHECK(result_set);
1897  const auto& top_permutation = result_set->getPermutationBuffer();
1898  CHECK_LE(top_permutation.size(), top_n);
1899  if (top_query_mem_desc.didOutputColumnar()) {
1900  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
1901  result_set->getQueryMemDesc(),
1902  top_storage,
1903  top_output_row_idx,
1904  top_query_mem_desc,
1905  top_permutation);
1906  } else {
1907  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
1908  top_storage,
1909  top_output_row_idx,
1910  top_query_mem_desc,
1911  top_permutation);
1912  }
1913  }
1914  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
1915  return top_result_set;
1916 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
const std::list< Analyzer::OrderEntry > order_entries
size_t permute_storage_row_wise(const ResultSetStorage *input_storage, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1849
const size_t limit
CHECK(cgen_state)
const SortInfo sort_info
#define CHECK_LE(x, y)
Definition: Logger.h:208
size_t permute_storage_columnar(const ResultSetStorage *input_storage, const QueryMemoryDescriptor &input_query_mem_desc, const ResultSetStorage *output_storage, size_t output_row_index, const QueryMemoryDescriptor &output_query_mem_desc, const std::vector< uint32_t > &top_permutation)
Definition: Execute.cpp:1799
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
const size_t offset

+ Here is the call graph for this function:

bool Executor::compileBody ( const RelAlgExecutionUnit ra_exe_unit,
GroupByAndAggregate group_by_and_aggregate,
const QueryMemoryDescriptor query_mem_desc,
const CompilationOptions co,
const GpuSharedMemoryContext gpu_smem_context = {} 
)
private

Definition at line 2207 of file NativeCodegen.cpp.

2211  {
2212  // generate the code for the filter
2213  std::vector<Analyzer::Expr*> primary_quals;
2214  std::vector<Analyzer::Expr*> deferred_quals;
2215  bool short_circuited =
2216  CodeGenerator::prioritizeQuals(ra_exe_unit, primary_quals, deferred_quals);
2217  if (short_circuited) {
2218  VLOG(1) << "Prioritized " << std::to_string(primary_quals.size()) << " quals, "
2219  << "short-circuited and deferred " << std::to_string(deferred_quals.size())
2220  << " quals";
2221  }
2222  llvm::Value* filter_lv = cgen_state_->llBool(true);
2223  CodeGenerator code_generator(this);
2224  for (auto expr : primary_quals) {
2225  // Generate the filter for primary quals
2226  auto cond = code_generator.toBool(code_generator.codegen(expr, true, co).front());
2227  filter_lv = cgen_state_->ir_builder_.CreateAnd(filter_lv, cond);
2228  }
2229  CHECK(filter_lv->getType()->isIntegerTy(1));
2230  llvm::BasicBlock* sc_false{nullptr};
2231  if (!deferred_quals.empty()) {
2232  auto sc_true = llvm::BasicBlock::Create(
2233  cgen_state_->context_, "sc_true", cgen_state_->row_func_);
2234  sc_false = llvm::BasicBlock::Create(
2235  cgen_state_->context_, "sc_false", cgen_state_->row_func_);
2236  cgen_state_->ir_builder_.CreateCondBr(filter_lv, sc_true, sc_false);
2237  cgen_state_->ir_builder_.SetInsertPoint(sc_false);
2238  if (ra_exe_unit.join_quals.empty()) {
2239  cgen_state_->ir_builder_.CreateRet(cgen_state_->llInt(int32_t(0)));
2240  }
2241  cgen_state_->ir_builder_.SetInsertPoint(sc_true);
2242  filter_lv = cgen_state_->llBool(true);
2243  }
2244  for (auto expr : deferred_quals) {
2245  filter_lv = cgen_state_->ir_builder_.CreateAnd(
2246  filter_lv, code_generator.toBool(code_generator.codegen(expr, true, co).front()));
2247  }
2248 
2249  CHECK(filter_lv->getType()->isIntegerTy(1));
2250  return group_by_and_aggregate.codegen(
2251  filter_lv, sc_false, query_mem_desc, co, gpu_smem_context);
2252 }
bool codegen(llvm::Value *filter_result, llvm::BasicBlock *sc_false, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const GpuSharedMemoryContext &gpu_smem_context)
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
static bool prioritizeQuals(const RelAlgExecutionUnit &ra_exe_unit, std::vector< Analyzer::Expr * > &primary_quals, std::vector< Analyzer::Expr * > &deferred_quals)
Definition: LogicalIR.cpp:157
std::string to_string(char const *&&v)
CHECK(cgen_state)
const JoinQualsPerNestingLevel join_quals
#define VLOG(n)
Definition: Logger.h:291
std::tuple< CompilationResult, std::unique_ptr< QueryMemoryDescriptor > > Executor::compileWorkUnit ( const std::vector< InputTableInfo > &  query_infos,
const RelAlgExecutionUnit ra_exe_unit,
const CompilationOptions co,
const ExecutionOptions eo,
const CudaMgr_Namespace::CudaMgr cuda_mgr,
const bool  allow_lazy_fetch,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const size_t  max_groups_buffer_entry_count,
const int8_t  crt_min_byte_width,
const bool  has_cardinality_estimation,
ColumnCacheMap column_cache,
RenderInfo render_info = nullptr 
)
private

Definition at line 1868 of file NativeCodegen.cpp.

1879  {
1880  auto timer = DEBUG_TIMER(__func__);
1881  nukeOldState(allow_lazy_fetch, query_infos, &ra_exe_unit);
1882 
1883  GroupByAndAggregate group_by_and_aggregate(
1884  this, co.device_type, ra_exe_unit, query_infos, row_set_mem_owner);
1885  auto query_mem_desc =
1886  group_by_and_aggregate.initQueryMemoryDescriptor(eo.allow_multifrag,
1887  max_groups_buffer_entry_guess,
1888  crt_min_byte_width,
1889  render_info,
1891 
1892  if (query_mem_desc->getQueryDescriptionType() ==
1894  !has_cardinality_estimation &&
1895  (!render_info || !render_info->isPotentialInSituRender()) && !eo.just_explain) {
1897  }
1898 
1899  const bool output_columnar = query_mem_desc->didOutputColumnar();
1900  const bool gpu_shared_mem_optimization =
1902  ra_exe_unit,
1903  cuda_mgr,
1904  co.device_type,
1905  cuda_mgr ? this->blockSize() : 1,
1906  cuda_mgr ? this->numBlocksPerMP() : 1);
1907  if (gpu_shared_mem_optimization) {
1908  // disable interleaved bins optimization on the GPU
1909  query_mem_desc->setHasInterleavedBinsOnGpu(false);
1910  LOG(DEBUG1) << "GPU shared memory is used for the " +
1911  query_mem_desc->queryDescTypeToString() + " query(" +
1912  std::to_string(get_shared_memory_size(gpu_shared_mem_optimization,
1913  query_mem_desc.get())) +
1914  " out of " + std::to_string(g_gpu_smem_threshold) + " bytes).";
1915  }
1916 
1917  const GpuSharedMemoryContext gpu_smem_context(
1918  get_shared_memory_size(gpu_shared_mem_optimization, query_mem_desc.get()));
1919 
1921  const size_t num_count_distinct_descs =
1922  query_mem_desc->getCountDistinctDescriptorsSize();
1923  for (size_t i = 0; i < num_count_distinct_descs; i++) {
1924  const auto& count_distinct_descriptor =
1925  query_mem_desc->getCountDistinctDescriptor(i);
1926  if (count_distinct_descriptor.impl_type_ == CountDistinctImplType::StdSet ||
1927  (count_distinct_descriptor.impl_type_ != CountDistinctImplType::Invalid &&
1928  !co.hoist_literals)) {
1929  throw QueryMustRunOnCpu();
1930  }
1931  }
1932  }
1933 
1934  // Read the module template and target either CPU or GPU
1935  // by binding the stream position functions to the right implementation:
1936  // stride access for GPU, contiguous for CPU
1937  auto rt_module_copy = llvm::CloneModule(
1938 #if LLVM_VERSION_MAJOR >= 7
1939  *g_rt_module.get(),
1940 #else
1941  g_rt_module.get(),
1942 #endif
1943  cgen_state_->vmap_,
1944  [](const llvm::GlobalValue* gv) {
1945  auto func = llvm::dyn_cast<llvm::Function>(gv);
1946  if (!func) {
1947  return true;
1948  }
1949  return (func->getLinkage() == llvm::GlobalValue::LinkageTypes::PrivateLinkage ||
1950  func->getLinkage() == llvm::GlobalValue::LinkageTypes::InternalLinkage ||
1952  });
1953 
1955  if (is_udf_module_present(true)) {
1957  }
1958  if (is_rt_udf_module_present(true)) {
1960  rt_udf_cpu_module, *rt_module_copy, cgen_state_.get());
1961  }
1962  } else {
1963  rt_module_copy->setDataLayout(get_gpu_data_layout());
1964  rt_module_copy->setTargetTriple(get_gpu_target_triple_string());
1965 
1966  if (is_udf_module_present()) {
1967  llvm::Triple gpu_triple(udf_gpu_module->getTargetTriple());
1968 
1969  if (!gpu_triple.isNVPTX()) {
1970  throw QueryMustRunOnCpu();
1971  }
1972 
1974  }
1975  if (is_rt_udf_module_present()) {
1977  rt_udf_gpu_module, *rt_module_copy, cgen_state_.get());
1978  }
1979  }
1980 
1981  cgen_state_->module_ = rt_module_copy.release();
1982 
1983  auto agg_fnames =
1984  get_agg_fnames(ra_exe_unit.target_exprs, !ra_exe_unit.groupby_exprs.empty());
1985 
1986  const auto agg_slot_count = ra_exe_unit.estimator ? size_t(1) : agg_fnames.size();
1987 
1988  const bool is_group_by{query_mem_desc->isGroupBy()};
1989  auto query_func = is_group_by ? query_group_by_template(cgen_state_->module_,
1990  co.hoist_literals,
1991  *query_mem_desc,
1992  co.device_type,
1993  ra_exe_unit.scan_limit,
1994  gpu_smem_context)
1996  agg_slot_count,
1997  co.hoist_literals,
1998  !!ra_exe_unit.estimator,
1999  gpu_smem_context);
2000  bind_pos_placeholders("pos_start", true, query_func, cgen_state_->module_);
2001  bind_pos_placeholders("group_buff_idx", false, query_func, cgen_state_->module_);
2002  bind_pos_placeholders("pos_step", false, query_func, cgen_state_->module_);
2003 
2004  cgen_state_->query_func_ = query_func;
2005  cgen_state_->query_func_entry_ir_builder_.SetInsertPoint(
2006  &query_func->getEntryBlock().front());
2007 
2008  std::vector<llvm::Value*> col_heads;
2009  std::tie(cgen_state_->row_func_, col_heads) =
2010  create_row_function(ra_exe_unit.input_col_descs.size(),
2011  is_group_by ? 0 : agg_slot_count,
2012  co.hoist_literals,
2013  query_func,
2014  cgen_state_->module_,
2015  cgen_state_->context_);
2016  CHECK(cgen_state_->row_func_);
2017  // make sure it's in-lined, we don't want register spills in the inner loop
2019  auto bb =
2020  llvm::BasicBlock::Create(cgen_state_->context_, "entry", cgen_state_->row_func_);
2021  cgen_state_->ir_builder_.SetInsertPoint(bb);
2022  preloadFragOffsets(ra_exe_unit.input_descs, query_infos);
2023  RelAlgExecutionUnit body_execution_unit = ra_exe_unit;
2024  const auto join_loops =
2025  buildJoinLoops(body_execution_unit, co, eo, query_infos, column_cache);
2026  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
2027  const auto is_not_deleted_bb = codegenSkipDeletedOuterTableRow(ra_exe_unit, co);
2028  if (is_not_deleted_bb) {
2029  bb = is_not_deleted_bb;
2030  }
2031  if (!join_loops.empty()) {
2032  codegenJoinLoops(join_loops,
2033  body_execution_unit,
2034  group_by_and_aggregate,
2035  query_func,
2036  bb,
2037  *(query_mem_desc.get()),
2038  co,
2039  eo);
2040  } else {
2041  const bool can_return_error = compileBody(
2042  ra_exe_unit, group_by_and_aggregate, *query_mem_desc, co, gpu_smem_context);
2043  if (can_return_error || cgen_state_->needs_error_check_ || eo.with_dynamic_watchdog ||
2045  createErrorCheckControlFlow(query_func,
2048  co.device_type);
2049  }
2050  }
2051  std::vector<llvm::Value*> hoisted_literals;
2052 
2053  if (co.hoist_literals) {
2054  VLOG(1) << "number of hoisted literals: "
2055  << cgen_state_->query_func_literal_loads_.size()
2056  << " / literal buffer usage: " << cgen_state_->getLiteralBufferUsage(0)
2057  << " bytes";
2058  }
2059 
2060  if (co.hoist_literals && !cgen_state_->query_func_literal_loads_.empty()) {
2061  // we have some hoisted literals...
2062  hoisted_literals = inlineHoistedLiterals();
2063  }
2064  // iterate through all the instruction in the query template function and
2065  // replace the call to the filter placeholder with the call to the actual filter
2066  for (auto it = llvm::inst_begin(query_func), e = llvm::inst_end(query_func); it != e;
2067  ++it) {
2068  if (!llvm::isa<llvm::CallInst>(*it)) {
2069  continue;
2070  }
2071  auto& filter_call = llvm::cast<llvm::CallInst>(*it);
2072  if (std::string(filter_call.getCalledFunction()->getName()) == "row_process") {
2073  std::vector<llvm::Value*> args;
2074  for (size_t i = 0; i < filter_call.getNumArgOperands(); ++i) {
2075  args.push_back(filter_call.getArgOperand(i));
2076  }
2077  args.insert(args.end(), col_heads.begin(), col_heads.end());
2078  args.push_back(get_arg_by_name(query_func, "join_hash_tables"));
2079  // push hoisted literals arguments, if any
2080  args.insert(args.end(), hoisted_literals.begin(), hoisted_literals.end());
2081 
2082  llvm::ReplaceInstWithInst(&filter_call,
2083  llvm::CallInst::Create(cgen_state_->row_func_, args, ""));
2084  break;
2085  }
2086  }
2087 
2088  plan_state_->init_agg_vals_ =
2089  init_agg_val_vec(ra_exe_unit.target_exprs, ra_exe_unit.quals, *query_mem_desc);
2090 
2091  /*
2092  * If we have decided to use GPU shared memory (decision is not made here), then
2093  * we generate proper code for extra components that it needs (buffer initialization and
2094  * gpu reduction from shared memory to global memory). We then replace these functions
2095  * into the already compiled query_func (replacing two placeholders, write_back_nop and
2096  * init_smem_nop). The rest of the code should be as before (row_func, etc.).
2097  */
2098  if (gpu_smem_context.isSharedMemoryUsed()) {
2099  if (query_mem_desc->getQueryDescriptionType() ==
2101  GpuSharedMemCodeBuilder gpu_smem_code(
2102  cgen_state_->module_,
2103  cgen_state_->context_,
2104  *query_mem_desc,
2106  plan_state_->init_agg_vals_);
2107  gpu_smem_code.codegen();
2108  gpu_smem_code.injectFunctionsInto(query_func);
2109 
2110  // helper functions are used for caching purposes later
2111  cgen_state_->helper_functions_.push_back(gpu_smem_code.getReductionFunction());
2112  cgen_state_->helper_functions_.push_back(gpu_smem_code.getInitFunction());
2113  LOG(IR) << gpu_smem_code.toString();
2114  }
2115  }
2116 
2117  auto multifrag_query_func = cgen_state_->module_->getFunction(
2118  "multifrag_query" + std::string(co.hoist_literals ? "_hoisted_literals" : ""));
2119  CHECK(multifrag_query_func);
2120 
2121  bind_query(query_func,
2122  "query_stub" + std::string(co.hoist_literals ? "_hoisted_literals" : ""),
2123  multifrag_query_func,
2124  cgen_state_->module_);
2125 
2126  auto live_funcs =
2128  {query_func, cgen_state_->row_func_},
2129  {multifrag_query_func});
2130 
2131  std::string llvm_ir;
2132  if (eo.just_explain) {
2134 #ifdef WITH_JIT_DEBUG
2135  throw std::runtime_error(
2136  "Explain optimized not available when JIT runtime debug symbols are enabled");
2137 #else
2138  // Note that we don't run the NVVM reflect pass here. Use LOG(IR) to get the
2139  // optimized IR after NVVM reflect
2140  llvm::legacy::PassManager pass_manager;
2141  optimize_ir(query_func, cgen_state_->module_, pass_manager, live_funcs, co);
2142 #endif // WITH_JIT_DEBUG
2143  }
2144  llvm_ir =
2145  serialize_llvm_object(query_func) + serialize_llvm_object(cgen_state_->row_func_);
2146  }
2147  verify_function_ir(cgen_state_->row_func_);
2148 
2149  LOG(IR) << query_mem_desc->toString() << "\nGenerated IR\n"
2150  << serialize_llvm_object(query_func)
2151  << serialize_llvm_object(cgen_state_->row_func_) << "\nEnd of IR";
2152 
2153  return std::make_tuple(
2156  ? optimizeAndCodegenCPU(query_func, multifrag_query_func, live_funcs, co)
2157  : optimizeAndCodegenGPU(query_func,
2158  multifrag_query_func,
2159  live_funcs,
2160  is_group_by || ra_exe_unit.estimator,
2161  cuda_mgr,
2162  co),
2163  cgen_state_->getLiterals(),
2164  output_columnar,
2165  llvm_ir,
2166  std::move(gpu_smem_context)},
2167  std::move(query_mem_desc));
2168 }
std::vector< Analyzer::Expr * > target_exprs
std::unique_ptr< llvm::Module > rt_udf_cpu_module
void codegenJoinLoops(const std::vector< JoinLoop > &join_loops, const RelAlgExecutionUnit &ra_exe_unit, GroupByAndAggregate &group_by_and_aggregate, llvm::Function *query_func, llvm::BasicBlock *entry_bb, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const ExecutionOptions &eo)
Definition: IRCodegen.cpp:547
std::unique_ptr< llvm::Module > udf_gpu_module
#define LOG(tag)
Definition: Logger.h:188
std::unique_ptr< llvm::Module > rt_udf_gpu_module
void mark_function_always_inline(llvm::Function *func)
llvm::StringRef get_gpu_data_layout()
bool is_udf_module_present(bool cpu_only=false)
std::vector< InputDescriptor > input_descs
llvm::Function * query_template(llvm::Module *module, const size_t aggr_col_count, const bool hoist_literals, const bool is_estimate_query, const GpuSharedMemoryContext &gpu_smem_context)
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
unsigned numBlocksPerMP() const
Definition: Execute.cpp:3120
void optimize_ir(llvm::Function *query_func, llvm::Module *module, llvm::legacy::PassManager &pass_manager, const std::unordered_set< llvm::Function * > &live_funcs, const CompilationOptions &co)
std::vector< std::string > get_agg_fnames(const std::vector< Analyzer::Expr * > &target_exprs, const bool is_group_by)
std::string to_string(char const *&&v)
void preloadFragOffsets(const std::vector< InputDescriptor > &input_descs, const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:3054
bool is_gpu_shared_mem_supported(const QueryMemoryDescriptor *query_mem_desc_ptr, const RelAlgExecutionUnit &ra_exe_unit, const CudaMgr_Namespace::CudaMgr *cuda_mgr, const ExecutorDeviceType device_type, const unsigned gpu_blocksize, const unsigned num_blocks_per_mp)
llvm::StringRef get_gpu_target_triple_string()
bool compileBody(const RelAlgExecutionUnit &ra_exe_unit, GroupByAndAggregate &group_by_and_aggregate, const QueryMemoryDescriptor &query_mem_desc, const CompilationOptions &co, const GpuSharedMemoryContext &gpu_smem_context={})
void verify_function_ir(const llvm::Function *func)
const bool allow_multifrag
CHECK(cgen_state)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:95
std::shared_ptr< CompilationContext > optimizeAndCodegenGPU(llvm::Function *, llvm::Function *, std::unordered_set< llvm::Function * > &, const bool no_inline, const CudaMgr_Namespace::CudaMgr *cuda_mgr, const CompilationOptions &)
static std::unordered_set< llvm::Function * > markDeadRuntimeFuncs(llvm::Module &module, const std::vector< llvm::Function * > &roots, const std::vector< llvm::Function * > &leaves)
const bool with_dynamic_watchdog
std::unique_ptr< llvm::Module > g_rt_module
ExecutorExplainType explain_type
std::shared_ptr< CompilationContext > optimizeAndCodegenCPU(llvm::Function *, llvm::Function *, const std::unordered_set< llvm::Function * > &, const CompilationOptions &)
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:870
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit *ra_exe_unit)
Definition: Execute.cpp:3038
static void link_udf_module(const std::unique_ptr< llvm::Module > &udf_module, llvm::Module &module, CgenState *cgen_state, llvm::Linker::Flags flags=llvm::Linker::Flags::None)
const std::shared_ptr< Analyzer::Estimator > estimator
ExecutorDeviceType device_type
llvm::BasicBlock * codegenSkipDeletedOuterTableRow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
std::string serialize_llvm_object(const T *llvm_obj)
static bool alwaysCloneRuntimeFunction(const llvm::Function *func)
void createErrorCheckControlFlow(llvm::Function *query_func, bool run_with_dynamic_watchdog, bool run_with_allowing_runtime_interrupt, ExecutorDeviceType device_type)
std::unique_ptr< llvm::Module > udf_cpu_module
void bind_pos_placeholders(const std::string &pos_fn_name, const bool use_resume_param, llvm::Function *query_func, llvm::Module *module)
std::list< std::shared_ptr< Analyzer::Expr > > quals
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::vector< llvm::Value * > inlineHoistedLiterals()
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr * > &targets, const QueryMemoryDescriptor &query_mem_desc)
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
std::vector< JoinLoop > buildJoinLoops(RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, const std::vector< InputTableInfo > &query_infos, ColumnCacheMap &column_cache)
Definition: IRCodegen.cpp:256
unsigned blockSize() const
Definition: Execute.cpp:3128
const bool allow_runtime_query_interrupt
std::vector< int64_t > init_agg_val_vec(const std::vector< TargetInfo > &targets, const QueryMemoryDescriptor &query_mem_desc)
bool is_rt_udf_module_present(bool cpu_only=false)
llvm::Function * query_group_by_template(llvm::Module *module, const bool hoist_literals, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, const bool check_scan_limit, const GpuSharedMemoryContext &gpu_smem_context)
#define VLOG(n)
Definition: Logger.h:291
std::pair< llvm::Function *, std::vector< llvm::Value * > > create_row_function(const size_t in_col_count, const size_t agg_col_count, const bool hoist_literals, llvm::Function *query_func, llvm::Module *module, llvm::LLVMContext &context)
size_t get_shared_memory_size(const bool shared_mem_used, const QueryMemoryDescriptor *query_mem_desc_ptr)
void bind_query(llvm::Function *query_func, const std::string &query_fname, llvm::Function *multifrag_query_func, llvm::Module *module)
size_t g_gpu_smem_threshold
Definition: Execute.cpp:111
AggregatedColRange Executor::computeColRangesCache ( const std::unordered_set< PhysicalInput > &  phys_inputs)
private

Definition at line 3447 of file Execute.cpp.

References catalog_(), CHECK(), getLeafColumnRange(), AggregatedColRange::setColRange(), and ExpressionRange::typeSupportsRange().

3448  {
3449  AggregatedColRange agg_col_range_cache;
3450  CHECK(catalog_);
3451  std::unordered_set<int> phys_table_ids;
3452  for (const auto& phys_input : phys_inputs) {
3453  phys_table_ids.insert(phys_input.table_id);
3454  }
3455  std::vector<InputTableInfo> query_infos;
3456  for (const int table_id : phys_table_ids) {
3457  query_infos.emplace_back(InputTableInfo{table_id, getTableInfo(table_id)});
3458  }
3459  for (const auto& phys_input : phys_inputs) {
3460  const auto cd =
3461  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3462  CHECK(cd);
3463  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
3464  const auto col_var = boost::make_unique<Analyzer::ColumnVar>(
3465  cd->columnType, phys_input.table_id, phys_input.col_id, 0);
3466  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
3467  agg_col_range_cache.setColRange(phys_input, col_range);
3468  }
3469  }
3470  return agg_col_range_cache;
3471 }
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:263
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:900
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
ExpressionRange getLeafColumnRange(const Analyzer::ColumnVar *col_expr, const std::vector< InputTableInfo > &query_infos, const Executor *executor, const bool is_outer_join_proj)
void setColRange(const PhysicalInput &, const ExpressionRange &)
static bool typeSupportsRange(const SQLTypeInfo &ti)

+ Here is the call graph for this function:

StringDictionaryGenerations Executor::computeStringDictionaryGenerations ( const std::unordered_set< PhysicalInput > &  phys_inputs)
private

Definition at line 3473 of file Execute.cpp.

References catalog_(), CHECK(), kENCODING_DICT, and StringDictionaryGenerations::setGeneration().

3474  {
3475  StringDictionaryGenerations string_dictionary_generations;
3476  CHECK(catalog_);
3477  for (const auto& phys_input : phys_inputs) {
3478  const auto cd =
3479  catalog_->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
3480  CHECK(cd);
3481  const auto& col_ti =
3482  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
3483  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
3484  const int dict_id = col_ti.get_comp_param();
3485  const auto dd = catalog_->getMetadataForDict(dict_id);
3486  CHECK(dd && dd->stringDict);
3487  string_dictionary_generations.setGeneration(dict_id,
3488  dd->stringDict->storageEntryCount());
3489  }
3490  }
3491  return string_dictionary_generations;
3492 }
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:900
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
void setGeneration(const uint32_t id, const size_t generation)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1449

+ Here is the call graph for this function:

TableGenerations Executor::computeTableGenerations ( std::unordered_set< int >  phys_table_ids)
private

Definition at line 3494 of file Execute.cpp.

References TableGenerations::setGeneration().

3495  {
3496  TableGenerations table_generations;
3497  for (const int table_id : phys_table_ids) {
3498  const auto table_info = getTableInfo(table_id);
3499  table_generations.setGeneration(
3500  table_id, TableGeneration{table_info.getPhysicalNumTuples(), 0});
3501  }
3502  return table_generations;
3503 }
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id) const
Definition: Execute.cpp:263
void setGeneration(const uint32_t id, const TableGeneration &generation)

+ Here is the call graph for this function:

bool Executor::containsLeftDeepOuterJoin ( ) const
inline

Definition at line 338 of file Execute.h.

References cgen_state_.

338  {
339  return cgen_state_->contains_left_deep_outer_join_;
340  }
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
void Executor::createErrorCheckControlFlow ( llvm::Function *  query_func,
bool  run_with_dynamic_watchdog,
bool  run_with_allowing_runtime_interrupt,
ExecutorDeviceType  device_type 
)
private

Definition at line 1449 of file NativeCodegen.cpp.

1452  {
1453  // check whether the row processing was successful; currently, it can
1454  // fail by running out of group by buffer slots
1455 
1456  if (run_with_dynamic_watchdog && run_with_allowing_runtime_interrupt) {
1457  // when both dynamic watchdog and runtime interrupt turns on
1458  // we use dynamic watchdog
1459  run_with_allowing_runtime_interrupt = false;
1460  }
1461 
1462  llvm::Value* row_count = nullptr;
1463  if ((run_with_dynamic_watchdog || run_with_allowing_runtime_interrupt) &&
1464  device_type == ExecutorDeviceType::GPU) {
1465  row_count =
1466  find_variable_in_basic_block<llvm::LoadInst>(query_func, ".entry", "row_count");
1467  }
1468 
1469  bool done_splitting = false;
1470  for (auto bb_it = query_func->begin(); bb_it != query_func->end() && !done_splitting;
1471  ++bb_it) {
1472  llvm::Value* pos = nullptr;
1473  for (auto inst_it = bb_it->begin(); inst_it != bb_it->end(); ++inst_it) {
1474  if ((run_with_dynamic_watchdog || run_with_allowing_runtime_interrupt) &&
1475  llvm::isa<llvm::PHINode>(*inst_it)) {
1476  if (inst_it->getName() == "pos") {
1477  pos = &*inst_it;
1478  }
1479  continue;
1480  }
1481  if (!llvm::isa<llvm::CallInst>(*inst_it)) {
1482  continue;
1483  }
1484  auto& filter_call = llvm::cast<llvm::CallInst>(*inst_it);
1485  if (std::string(filter_call.getCalledFunction()->getName()) == "row_process") {
1486  auto next_inst_it = inst_it;
1487  ++next_inst_it;
1488  auto new_bb = bb_it->splitBasicBlock(next_inst_it);
1489  auto& br_instr = bb_it->back();
1490  llvm::IRBuilder<> ir_builder(&br_instr);
1491  llvm::Value* err_lv = &*inst_it;
1492  if (run_with_dynamic_watchdog) {
1493  CHECK(pos);
1494  llvm::Value* call_watchdog_lv = nullptr;
1495  if (device_type == ExecutorDeviceType::GPU) {
1496  // In order to make sure all threads within a block see the same barrier,
1497  // only those blocks whose none of their threads have experienced the critical
1498  // edge will go through the dynamic watchdog computation
1499  CHECK(row_count);
1500  auto crit_edge_rem =
1501  (blockSize() & (blockSize() - 1))
1502  ? ir_builder.CreateSRem(
1503  row_count,
1504  cgen_state_->llInt(static_cast<int64_t>(blockSize())))
1505  : ir_builder.CreateAnd(
1506  row_count,
1507  cgen_state_->llInt(static_cast<int64_t>(blockSize() - 1)));
1508  auto crit_edge_threshold = ir_builder.CreateSub(row_count, crit_edge_rem);
1509  crit_edge_threshold->setName("crit_edge_threshold");
1510 
1511  // only those threads where pos < crit_edge_threshold go through dynamic
1512  // watchdog call
1513  call_watchdog_lv =
1514  ir_builder.CreateICmp(llvm::ICmpInst::ICMP_SLT, pos, crit_edge_threshold);
1515  } else {
1516  // CPU path: run watchdog for every 64th row
1517  auto dw_predicate = ir_builder.CreateAnd(pos, uint64_t(0x3f));
1518  call_watchdog_lv = ir_builder.CreateICmp(
1519  llvm::ICmpInst::ICMP_EQ, dw_predicate, cgen_state_->llInt(int64_t(0LL)));
1520  }
1521  CHECK(call_watchdog_lv);
1522  auto error_check_bb = bb_it->splitBasicBlock(
1523  llvm::BasicBlock::iterator(br_instr), ".error_check");
1524  auto& watchdog_br_instr = bb_it->back();
1525 
1526  auto watchdog_check_bb = llvm::BasicBlock::Create(
1527  cgen_state_->context_, ".watchdog_check", query_func, error_check_bb);
1528  llvm::IRBuilder<> watchdog_ir_builder(watchdog_check_bb);
1529  auto detected_timeout = watchdog_ir_builder.CreateCall(
1530  cgen_state_->module_->getFunction("dynamic_watchdog"), {});
1531  auto timeout_err_lv = watchdog_ir_builder.CreateSelect(
1532  detected_timeout, cgen_state_->llInt(Executor::ERR_OUT_OF_TIME), err_lv);
1533  watchdog_ir_builder.CreateBr(error_check_bb);
1534 
1535  llvm::ReplaceInstWithInst(
1536  &watchdog_br_instr,
1537  llvm::BranchInst::Create(
1538  watchdog_check_bb, error_check_bb, call_watchdog_lv));
1539  ir_builder.SetInsertPoint(&br_instr);
1540  auto unified_err_lv = ir_builder.CreatePHI(err_lv->getType(), 2);
1541 
1542  unified_err_lv->addIncoming(timeout_err_lv, watchdog_check_bb);
1543  unified_err_lv->addIncoming(err_lv, &*bb_it);
1544  err_lv = unified_err_lv;
1545  } else if (run_with_allowing_runtime_interrupt) {
1546  CHECK(pos);
1547  llvm::Value* call_check_interrupt_lv = nullptr;
1548  if (device_type == ExecutorDeviceType::GPU) {
1549  // approximate how many times the %pos variable
1550  // is increased --> the number of iteration
1551  int32_t num_shift_by_gridDim = getExpOfTwo(gridSize());
1552  int32_t num_shift_by_blockDim = getExpOfTwo(blockSize());
1553  if (!isPowOfTwo(gridSize())) {
1554  num_shift_by_gridDim++;
1555  }
1556  if (!isPowOfTwo(blockSize())) {
1557  num_shift_by_blockDim++;
1558  }
1559  int total_num_shift = num_shift_by_gridDim + num_shift_by_blockDim;
1560  // check the interrupt flag for every 64th iteration
1561  llvm::Value* pos_shifted_per_iteration =
1562  ir_builder.CreateLShr(pos, cgen_state_->llInt(total_num_shift));
1563  auto interrupt_predicate =
1564  ir_builder.CreateAnd(pos_shifted_per_iteration, uint64_t(0x3f));
1565  call_check_interrupt_lv =
1566  ir_builder.CreateICmp(llvm::ICmpInst::ICMP_EQ,
1567  interrupt_predicate,
1568  cgen_state_->llInt(int64_t(0LL)));
1569  } else {
1570  // CPU path: run interrupt checker for every 64th row
1571  auto interrupt_predicate = ir_builder.CreateAnd(pos, uint64_t(0x3f));
1572  call_check_interrupt_lv =
1573  ir_builder.CreateICmp(llvm::ICmpInst::ICMP_EQ,
1574  interrupt_predicate,
1575  cgen_state_->llInt(int64_t(0LL)));
1576  }
1577  CHECK(call_check_interrupt_lv);
1578  auto error_check_bb = bb_it->splitBasicBlock(
1579  llvm::BasicBlock::iterator(br_instr), ".error_check");
1580  auto& check_interrupt_br_instr = bb_it->back();
1581 
1582  auto interrupt_check_bb = llvm::BasicBlock::Create(
1583  cgen_state_->context_, ".interrupt_check", query_func, error_check_bb);
1584  llvm::IRBuilder<> interrupt_checker_ir_builder(interrupt_check_bb);
1585  auto detected_interrupt = interrupt_checker_ir_builder.CreateCall(
1586  cgen_state_->module_->getFunction("check_interrupt"), {});
1587  auto interrupt_err_lv = interrupt_checker_ir_builder.CreateSelect(
1588  detected_interrupt, cgen_state_->llInt(Executor::ERR_INTERRUPTED), err_lv);
1589  interrupt_checker_ir_builder.CreateBr(error_check_bb);
1590 
1591  llvm::ReplaceInstWithInst(
1592  &check_interrupt_br_instr,
1593  llvm::BranchInst::Create(
1594  interrupt_check_bb, error_check_bb, call_check_interrupt_lv));
1595  ir_builder.SetInsertPoint(&br_instr);
1596  auto unified_err_lv = ir_builder.CreatePHI(err_lv->getType(), 2);
1597 
1598  unified_err_lv->addIncoming(interrupt_err_lv, interrupt_check_bb);
1599  unified_err_lv->addIncoming(err_lv, &*bb_it);
1600  err_lv = unified_err_lv;
1601  }
1602  const auto error_code_arg = get_arg_by_name(query_func, "error_code");
1603  err_lv =
1604  ir_builder.CreateCall(cgen_state_->module_->getFunction("record_error_code"),
1605  std::vector<llvm::Value*>{err_lv, error_code_arg});
1606  if (device_type == ExecutorDeviceType::GPU) {
1607  // let kernel execution finish as expected, regardless of the observed error,
1608  // unless it is from the dynamic watchdog where all threads within that block
1609  // return together.
1610  if (run_with_allowing_runtime_interrupt) {
1611  err_lv = ir_builder.CreateICmp(llvm::ICmpInst::ICMP_EQ,
1612  err_lv,
1614  } else {
1615  err_lv = ir_builder.CreateICmp(llvm::ICmpInst::ICMP_EQ,
1616  err_lv,
1618  }
1619 
1620  } else {
1621  err_lv = ir_builder.CreateICmp(llvm::ICmpInst::ICMP_NE,
1622  err_lv,
1623  cgen_state_->llInt(static_cast<int32_t>(0)));
1624  }
1625  auto error_bb = llvm::BasicBlock::Create(
1626  cgen_state_->context_, ".error_exit", query_func, new_bb);
1627  llvm::ReturnInst::Create(cgen_state_->context_, error_bb);
1628  llvm::ReplaceInstWithInst(&br_instr,
1629  llvm::BranchInst::Create(error_bb, new_bb, err_lv));
1630  done_splitting = true;
1631  break;
1632  }
1633  }
1634  }
1635  CHECK(done_splitting);
1636 }
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:943
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
unsigned getExpOfTwo(unsigned n)
Definition: MathUtils.h:24
bool isPowOfTwo(unsigned n)
Definition: MathUtils.h:20
CHECK(cgen_state)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:95
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:942
unsigned gridSize() const
Definition: Execute.cpp:3113
unsigned blockSize() const
Definition: Execute.cpp:3128
std::vector< std::unique_ptr< ExecutionKernel > > Executor::createKernels ( SharedKernelContext shared_context,
const RelAlgExecutionUnit ra_exe_unit,
ColumnFetcher column_fetcher,
const std::vector< InputTableInfo > &  table_infos,
const ExecutionOptions eo,
const bool  is_agg,
const bool  allow_single_frag_table_opt,
const size_t  context_count,
const QueryCompilationDescriptor query_comp_desc,
const QueryMemoryDescriptor query_mem_desc,
RenderInfo render_info,
std::unordered_set< int > &  available_gpus,
int &  available_cpus 
)
private

Determines execution dispatch mode and required fragments for a given query step, then creates kernels to execute the query and returns them for launch.

Definition at line 1944 of file Execute.cpp.

References ExecutionOptions::allow_multifrag, catalog_(), CHECK(), CHECK_GE, CHECK_GT, anonymous_namespace{Execute.cpp}::checkWorkUnitWatchdog(), g_inner_join_fragment_skipping, QueryCompilationDescriptor::getDeviceType(), QueryMemoryDescriptor::getEntryCount(), SharedKernelContext::getFragOffsets(), QueryMemoryDescriptor::getQueryDescriptionType(), GPU, ExecutionOptions::gpu_input_mem_limit_percent, Data_Namespace::GPU_LEVEL, anonymous_namespace{Execute.cpp}::has_lazy_fetched_columns(), logger::INFO, RelAlgExecutionUnit::input_descs, KernelPerFragment, LOG, MultifragmentKernel, ExecutionOptions::outer_fragment_indices, Projection, query_mem_desc, RelAlgExecutionUnit::target_exprs, logger::thread_id(), QueryMemoryDescriptor::toString(), RelAlgExecutionUnit::use_bump_allocator, VLOG, and ExecutionOptions::with_watchdog.

1957  {
1958  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
1959 
1960  QueryFragmentDescriptor fragment_descriptor(
1961  ra_exe_unit,
1962  table_infos,
1963  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
1965  : std::vector<Data_Namespace::MemoryInfo>{},
1968  CHECK(!ra_exe_unit.input_descs.empty());
1969 
1970  const auto device_type = query_comp_desc.getDeviceType();
1971  const bool uses_lazy_fetch =
1972  plan_state_->allow_lazy_fetch_ &&
1974  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
1975  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
1976  const auto device_count = deviceCount(device_type);
1977  CHECK_GT(device_count, 0);
1978 
1979  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
1980  shared_context.getFragOffsets(),
1981  device_count,
1982  device_type,
1983  use_multifrag_kernel,
1985  this);
1986  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
1987  checkWorkUnitWatchdog(ra_exe_unit, table_infos, *catalog_, device_type, device_count);
1988  }
1989 
1990  if (use_multifrag_kernel) {
1991  VLOG(1) << "Creating multifrag execution kernels";
1992  VLOG(1) << query_mem_desc.toString();
1993 
1994  // NB: We should never be on this path when the query is retried because of running
1995  // out of group by slots; also, for scan only queries on CPU we want the
1996  // high-granularity, fragment by fragment execution instead. For scan only queries on
1997  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
1998  // buffer per fragment.
1999  auto multifrag_kernel_dispatch = [&ra_exe_unit,
2000  &execution_kernels,
2001  &column_fetcher,
2002  &eo,
2003  &query_comp_desc,
2004  &query_mem_desc,
2005  render_info,
2006  parent_thread_id = logger::thread_id()](
2007  const int device_id,
2008  const FragmentsList& frag_list,
2009  const int64_t rowid_lookup_key) {
2010  execution_kernels.emplace_back(
2011  std::make_unique<ExecutionKernel>(ra_exe_unit,
2013  device_id,
2014  eo,
2015  column_fetcher,
2016  query_comp_desc,
2017  query_mem_desc,
2018  frag_list,
2020  render_info,
2021  rowid_lookup_key,
2022  parent_thread_id));
2023  };
2024  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2025  } else {
2026  VLOG(1) << "Creating one execution kernel per fragment";
2027  VLOG(1) << query_mem_desc.toString();
2028 
2029  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2030  (query_mem_desc.getQueryDescriptionType() == QueryDescriptionType::Projection) &&
2031  table_infos.size() == 1 && table_infos.front().table_id > 0) {
2032  const auto max_frag_size =
2033  table_infos.front().info.getFragmentNumTuplesUpperBound();
2034  if (max_frag_size < query_mem_desc.getEntryCount()) {
2035  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2036  << " to match max fragment size " << max_frag_size
2037  << " for kernel per fragment execution path.";
2038  throw CompilationRetryNewScanLimit(max_frag_size);
2039  }
2040  }
2041 
2042  size_t frag_list_idx{0};
2043  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2044  &execution_kernels,
2045  &column_fetcher,
2046  &eo,
2047  &frag_list_idx,
2048  &device_type,
2049  &query_comp_desc,
2050  &query_mem_desc,
2051  render_info,
2052  parent_thread_id = logger::thread_id()](
2053  const int device_id,
2054  const FragmentsList& frag_list,
2055  const int64_t rowid_lookup_key) {
2056  if (!frag_list.size()) {
2057  return;
2058  }
2059  CHECK_GE(device_id, 0);
2060 
2061  execution_kernels.emplace_back(
2062  std::make_unique<ExecutionKernel>(ra_exe_unit,
2063  device_type,
2064  device_id,
2065  eo,
2066  column_fetcher,
2067  query_comp_desc,
2068  query_mem_desc,
2069  frag_list,
2071  render_info,
2072  rowid_lookup_key,
2073  parent_thread_id));
2074  ++frag_list_idx;
2075  };
2076 
2077  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
2078  ra_exe_unit);
2079  }
2080 
2081  return execution_kernels;
2082 }
bool is_agg(const Analyzer::Expr *expr)
std::vector< Analyzer::Expr * > target_exprs
ExecutorDeviceType getDeviceType() const
const std::vector< uint64_t > & getFragOffsets()
std::string toString() const
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:195
#define LOG(tag)
Definition: Logger.h:188
std::vector< ColumnLazyFetchInfo > getColLazyFetchInfo(const std::vector< Analyzer::Expr * > &target_exprs) const
Definition: Execute.cpp:305
std::vector< InputDescriptor > input_descs
#define CHECK_GE(x, y)
Definition: Logger.h:210
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:615
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::vector< FragmentsPerTable > FragmentsList
bool g_inner_join_fragment_skipping
Definition: Execute.cpp:82
const bool allow_multifrag
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:900
const double gpu_input_mem_limit_percent
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:870
void checkWorkUnitWatchdog(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const Catalog_Namespace::Catalog &cat, const ExecutorDeviceType device_type, const int device_count)
Definition: Execute.cpp:1069
std::vector< MemoryInfo > getMemoryInfo(const MemoryLevel memLevel)
Definition: DataMgr.cpp:291
const std::vector< size_t > outer_fragment_indices
ThreadId thread_id()
Definition: Logger.cpp:715
bool has_lazy_fetched_columns(const std::vector< ColumnLazyFetchInfo > &fetched_cols)
Definition: Execute.cpp:1933
#define VLOG(n)
Definition: Logger.h:291
const bool with_watchdog

+ Here is the call graph for this function:

int Executor::deviceCount ( const ExecutorDeviceType  device_type) const
private

Definition at line 615 of file Execute.cpp.

References catalog_(), CHECK(), and GPU.

615  {
616  if (device_type == ExecutorDeviceType::GPU) {
617  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
618  CHECK(cuda_mgr);
619  return cuda_mgr->getDeviceCount();
620  } else {
621  return 1;
622  }
623 }
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:207
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:195
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:900

+ Here is the call graph for this function:

int Executor::deviceCountForMemoryLevel ( const Data_Namespace::MemoryLevel  memory_level) const
private

Definition at line 625 of file Execute.cpp.

References CPU, GPU, and Data_Namespace::GPU_LEVEL.

626  {
627  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
628  : deviceCount(ExecutorDeviceType::CPU);
629 }
ExecutorDeviceType
int deviceCount(const ExecutorDeviceType) const
Definition: Execute.cpp:615
int64_t Executor::deviceCycles ( int  milliseconds) const
private

Definition at line 3140 of file Execute.cpp.

References catalog_(), and CHECK().

3140  {
3141  CHECK(catalog_);
3142  const auto cuda_mgr = catalog_->getDataMgr().getCudaMgr();
3143  CHECK(cuda_mgr);
3144  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
3145  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
3146 }
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:207
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:195
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog_
Definition: Execute.h:900

+ Here is the call graph for this function:

void Executor::enableRuntimeQueryInterrupt ( const unsigned  interrupt_freq) const

Definition at line 3574 of file Execute.cpp.

References g_enable_runtime_query_interrupt, and g_runtime_query_interrupt_frequency.

3574  {
3575  // The only one scenario that we intentionally call this function is
3576  // to allow runtime query interrupt in QueryRunner for test cases.
3577  // Because test machine's default setting does not allow runtime query interrupt,
3578  // so we have to turn it on within test code if necessary.
3580  g_runtime_query_interrupt_frequency = interrupt_freq;
3581 }
unsigned g_runtime_query_interrupt_frequency
Definition: Execute.cpp:110
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:108
ResultSetPtr Executor::executeExplain ( const QueryCompilationDescriptor query_comp_desc)
private

Definition at line 1624 of file Execute.cpp.

References QueryCompilationDescriptor::getIR().

1624  {
1625  return std::make_shared<ResultSet>(query_comp_desc.getIR());
1626 }

+ Here is the call graph for this function:

int32_t Executor::executePlanWithGroupBy ( const RelAlgExecutionUnit ra_exe_unit,
const CompilationResult compilation_result,
const bool  hoist_literals,
ResultSetPtr results,
const ExecutorDeviceType  device_type,
std::vector< std::vector< const int8_t * >> &  col_buffers,
const std::vector< size_t >  outer_tab_frag_ids,
QueryExecutionContext query_exe_context,
const std::vector< std::vector< int64_t >> &  num_rows,
const std::vector< std::vector< uint64_t >> &  frag_offsets,
Data_Namespace::DataMgr data_mgr,
const int  device_id,
const int  outer_table_id,
const int64_t  limit,
const uint32_t  start_rowid,
const uint32_t  num_tables,
RenderInfo render_info 
)
private

Definition at line 2857 of file Execute.cpp.

References CHECK(), CHECK_NE, anonymous_namespace{Execute.cpp}::check_rows_less_than_needed(), CPU, DEBUG_TIMER, ERR_DIV_BY_ZERO, ERR_GEOS, ERR_INTERRUPTED, ERR_OUT_OF_TIME, ERR_OVERFLOW_OR_UNDERFLOW, ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES, error_code, logger::FATAL, g_enable_dynamic_watchdog, g_enable_runtime_query_interrupt, CompilationResult::generated_code, QueryMemoryDescriptor::getEntryCount(), QueryExecutionContext::getRowSet(), GpuSharedMemoryContext::getSharedMemorySize(), GPU, CompilationResult::gpu_smem_context, RelAlgExecutionUnit::groupby_exprs, INJECT_TIMER, RelAlgExecutionUnit::input_col_descs, RelAlgExecutionUnit::input_descs, QueryExecutionContext::launchCpuCode(), QueryExecutionContext::launchGpuCode(), CompilationResult::literal_values, LOG, num_rows, shared::printContainer(), QueryExecutionContext::query_buffers_, QueryExecutionContext::query_mem_desc_, RenderInfo::render_allocator_map_ptr, RelAlgExecutionUnit::scan_limit, QueryMemoryDescriptor::setEntryCount(), RelAlgExecutionUnit::union_all, RenderInfo::useCudaBuffers(), and VLOG.

2874  {
2875  auto timer = DEBUG_TIMER(__func__);
2877  CHECK(!results);
2878  if (col_buffers.empty()) {
2879  return 0;
2880  }
2881  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
2882  // TODO(alex):
2883  // 1. Optimize size (make keys more compact).
2884  // 2. Resize on overflow.
2885  // 3. Optimize runtime.
2886  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2887  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2888  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2890  interrupted_.load()) {
2891  return ERR_INTERRUPTED;
2892  }
2893 
2894  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2895  if (render_info && render_info->useCudaBuffers()) {
2896  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2897  }
2898 
2899  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
2900  << " ra_exe_unit.input_descs="
2901  << shared::printContainer(ra_exe_unit.input_descs)
2902  << " ra_exe_unit.input_col_descs="
2903  << shared::printContainer(ra_exe_unit.input_col_descs)
2904  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
2905  << " num_rows=" << shared::printContainer(num_rows)
2906  << " frag_offsets=" << shared::printContainer(frag_offsets)
2907  << " query_exe_context->query_buffers_->num_rows_="
2908  << query_exe_context->query_buffers_->num_rows_
2909  << " query_exe_context->query_mem_desc_.getEntryCount()="
2910  << query_exe_context->query_mem_desc_.getEntryCount()
2911  << " device_id=" << device_id << " outer_table_id=" << outer_table_id
2912  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
2913  << " num_tables=" << num_tables;
2914 
2915  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
2916  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
2917  // with outer_table_id.
2918  if (ra_exe_unit_copy.union_all) {
2919  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
2920  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
2921  ra_exe_unit_copy.input_descs.end(),
2922  [outer_table_id](auto const& a, auto const& b) {
2923  return a.getTableId() == outer_table_id &&
2924  b.getTableId() != outer_table_id;
2925  });
2926  while (!ra_exe_unit_copy.input_descs.empty() &&
2927  ra_exe_unit_copy.input_descs.back().getTableId() != outer_table_id) {
2928  ra_exe_unit_copy.input_descs.pop_back();
2929  }
2930  // Filter ra_exe_unit_copy.input_col_descs.
2931  ra_exe_unit_copy.input_col_descs.remove_if(
2932  [outer_table_id](auto const& input_col_desc) {
2933  return input_col_desc->getScanDesc().getTableId() != outer_table_id;
2934  });
2935  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
2936  }
2937 
2938  if (device_type == ExecutorDeviceType::CPU) {
2939  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
2940  compilation_result.generated_code);
2941  CHECK(cpu_generated_code);
2942  query_exe_context->launchCpuCode(
2943  ra_exe_unit_copy,
2944  cpu_generated_code.get(),
2945  hoist_literals,
2946  hoist_buf,
2947  col_buffers,
2948  num_rows,
2949  frag_offsets,
2950  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
2951  &error_code,
2952  num_tables,
2953  join_hash_table_ptrs);
2954  } else {
2955  try {
2956  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
2957  compilation_result.generated_code);
2958  CHECK(gpu_generated_code);
2959  query_exe_context->launchGpuCode(
2960  ra_exe_unit_copy,
2961  gpu_generated_code.get(),
2962  hoist_literals,
2963  hoist_buf,
2964  col_buffers,
2965  num_rows,
2966  frag_offsets,
2967  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
2968  data_mgr,
2969  blockSize(),
2970  gridSize(),
2971  device_id,
2972  compilation_result.gpu_smem_context.getSharedMemorySize(),
2973  &error_code,
2974  num_tables,
2975  join_hash_table_ptrs,
2976  render_allocator_map_ptr);
2977  } catch (const OutOfMemory&) {
2978  return ERR_OUT_OF_GPU_MEM;
2979  } catch (const OutOfRenderMemory&) {
2980  return ERR_OUT_OF_RENDER_MEM;
2981  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
2983  } catch (const std::exception& e) {
2984  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2985  }
2986  }
2987 
2988  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2989  error_code == Executor::ERR_DIV_BY_ZERO ||
2990  error_code == Executor::ERR_OUT_OF_TIME ||
2991  error_code == Executor::ERR_INTERRUPTED ||
2993  error_code == Executor::ERR_GEOS) {
2994  return error_code;
2995  }
2996 
2997  if (error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
2998  error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
2999  results = query_exe_context->getRowSet(ra_exe_unit_copy,
3000  query_exe_context->query_mem_desc_);
3001  CHECK(results);
3002  VLOG(2) << "results->rowCount()=" << results->rowCount();
3003  results->holdLiterals(hoist_buf);
3004  }
3005  if (error_code < 0 && render_allocator_map_ptr) {
3006  auto const adjusted_scan_limit =
3007  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
3008  // More rows passed the filter than available slots. We don't have a count to check,
3009  // so assume we met the limit if a scan limit is set
3010  if (adjusted_scan_limit != 0) {
3011  return 0;
3012  } else {
3013  return error_code;
3014  }
3015  }
3016  if (error_code && (!scan_limit || check_rows_less_than_needed(results, scan_limit))) {
3017  return error_code; // unlucky, not enough results and we ran out of slots
3018  }
3019 
3020  return 0;
3021 }
bool useCudaBuffers() const
Definition: RenderInfo.cpp:69
const int8_t const int64_t * num_rows
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:943
void setEntryCount(const size_t val)
GpuSharedMemoryContext gpu_smem_context
const std::optional< bool > union_all
#define LOG(tag)
Definition: Logger.h:188
size_t getSharedMemorySize() const
std::vector< InputDescriptor > input_descs
static const int32_t ERR_GEOS
Definition: Execute.h:949
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:75
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const GpuCompilationContext *cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
std::unique_ptr< QueryMemoryInitializer > query_buffers_
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< size_t > outer_tab_frag_ids, QueryExecutionContext *, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *, const int device_id, const int outer_table_id, const int64_t limit, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2857
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3023
static const int32_t ERR_STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY
Definition: Execute.h:947
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:935
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
CHECK(cgen_state)
#define INJECT_TIMER(DESC)
Definition: measure.h:91
#define CHECK_NE(x, y)
Definition: Logger.h:206
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:939
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:941
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:942
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:948
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:936
std::shared_ptr< CompilationContext > generated_code
QueryMemoryDescriptor query_mem_desc_
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:353
unsigned gridSize() const
Definition: Execute.cpp:3113
std::unordered_map< int, CgenState::LiteralValues > literal_values
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
bool check_rows_less_than_needed(const ResultSetPtr &results, const size_t scan_limit)
Definition: Execute.cpp:2850
static std::atomic< bool > interrupted_
Definition: Execute.h:879
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:63
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
unsigned blockSize() const
Definition: Execute.cpp:3128
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:108
#define VLOG(n)
Definition: Logger.h:291

+ Here is the call graph for this function:

int32_t Executor::executePlanWithoutGroupBy ( const RelAlgExecutionUnit ra_exe_unit,
const CompilationResult compilation_result,
const bool  hoist_literals,
ResultSetPtr results,
const std::vector< Analyzer::Expr * > &  target_exprs,
const ExecutorDeviceType  device_type,
std::vector< std::vector< const int8_t * >> &  col_buffers,
QueryExecutionContext query_exe_context,
const std::vector< std::vector< int64_t >> &  num_rows,
const std::vector< std::vector< uint64_t >> &  frag_offsets,
Data_Namespace::DataMgr data_mgr,
const int  device_id,
const uint32_t  start_rowid,
const uint32_t  num_tables,
RenderInfo render_info 
)
private

Definition at line 2657 of file Execute.cpp.

References CHECK(), CHECK_EQ, CPU, DEBUG_TIMER, ERR_DIV_BY_ZERO, ERR_GEOS, ERR_INTERRUPTED, ERR_OUT_OF_TIME, ERR_OVERFLOW_OR_UNDERFLOW, ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES, error_code, RelAlgExecutionUnit::estimator, QueryExecutionContext::estimator_result_set_, logger::FATAL, g_bigint_count, g_enable_dynamic_watchdog, g_enable_runtime_query_interrupt, CompilationResult::generated_code, get_target_info(), QueryExecutionContext::getAggInitValForIndex(), QueryMemoryDescriptor::getPaddedSlotWidthBytes(), GpuSharedMemoryContext::getSharedMemorySize(), GPU, CompilationResult::gpu_smem_context, INJECT_TIMER, is_distinct_target(), RenderInfo::isPotentialInSituRender(), GpuSharedMemoryContext::isSharedMemoryUsed(), kAPPROX_COUNT_DISTINCT, kAVG, kCOUNT, kSAMPLE, QueryExecutionContext::launchCpuCode(), QueryExecutionContext::launchGpuCode(), CompilationResult::literal_values, LOG, num_rows, out, QueryExecutionContext::query_buffers_, QueryExecutionContext::query_mem_desc_, reduceResults(), RenderInfo::render_allocator_map_ptr, takes_float_argument(), and RenderInfo::useCudaBuffers().

2672  {
2674  auto timer = DEBUG_TIMER(__func__);
2675  CHECK(!results);
2676  if (col_buffers.empty()) {
2677  return 0;
2678  }
2679 
2680  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
2681  if (render_info) {
2682  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
2683  // here, we are in non-insitu mode.
2684  CHECK(render_info->useCudaBuffers() || !render_info->isPotentialInSituRender())
2685  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
2686  "currently unsupported.";
2687  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
2688  }
2689 
2690  int32_t error_code = device_type == ExecutorDeviceType::GPU ? 0 : start_rowid;
2691  std::vector<int64_t*> out_vec;
2692  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
2693  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
2694  std::unique_ptr<OutVecOwner> output_memory_scope;
2696  interrupted_.load()) {
2697  resetInterrupt();
2699  }
2700  if (device_type == ExecutorDeviceType::CPU) {
2701  auto cpu_generated_code = std::dynamic_pointer_cast<CpuCompilationContext>(
2702  compilation_result.generated_code);
2703  CHECK(cpu_generated_code);
2704  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
2705  cpu_generated_code.get(),
2706  hoist_literals,
2707  hoist_buf,
2708  col_buffers,
2709  num_rows,
2710  frag_offsets,
2711  0,
2712  &error_code,
2713  num_tables,
2714  join_hash_table_ptrs);
2715  output_memory_scope.reset(new OutVecOwner(out_vec));
2716  } else {
2717  auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
2718  compilation_result.generated_code);
2719  CHECK(gpu_generated_code);
2720  try {
2721  out_vec = query_exe_context->launchGpuCode(
2722  ra_exe_unit,
2723  gpu_generated_code.get(),
2724  hoist_literals,
2725  hoist_buf,
2726  col_buffers,
2727  num_rows,
2728  frag_offsets,
2729  0,
2730  data_mgr,
2731  blockSize(),
2732  gridSize(),
2733  device_id,
2734  compilation_result.gpu_smem_context.getSharedMemorySize(),
2735  &error_code,
2736  num_tables,
2737  join_hash_table_ptrs,
2738  render_allocator_map_ptr);
2739  output_memory_scope.reset(new OutVecOwner(out_vec));
2740  } catch (const OutOfMemory&) {
2741  return ERR_OUT_OF_GPU_MEM;
2742  } catch (const std::exception& e) {
2743  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
2744  }
2745  }
2746  if (error_code == Executor::ERR_OVERFLOW_OR_UNDERFLOW ||
2747  error_code == Executor::ERR_DIV_BY_ZERO ||
2748  error_code == Executor::ERR_OUT_OF_TIME ||
2749  error_code == Executor::ERR_INTERRUPTED ||
2751  error_code == Executor::ERR_GEOS) {
2752  return error_code;
2753  }
2754  if (ra_exe_unit.estimator) {
2755  CHECK(!error_code);
2756  results =
2757  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
2758  return 0;
2759  }
2760  std::vector<int64_t> reduced_outs;
2761  const auto num_frags = col_buffers.size();
2762  const size_t entry_count =
2763  device_type == ExecutorDeviceType::GPU
2764  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
2765  ? 1
2766  : blockSize() * gridSize() * num_frags)
2767  : num_frags;
2768  if (size_t(1) == entry_count) {
2769  for (auto out : out_vec) {
2770  CHECK(out);
2771  reduced_outs.push_back(*out);
2772  }
2773  } else {
2774  size_t out_vec_idx = 0;
2775 
2776  for (const auto target_expr : target_exprs) {
2777  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2778  CHECK(agg_info.is_agg);
2779 
2780  const int num_iterations = agg_info.sql_type.is_geometry()
2781  ? agg_info.sql_type.get_physical_coord_cols()
2782  : 1;
2783 
2784  for (int i = 0; i < num_iterations; i++) {
2785  int64_t val1;
2786  const bool float_argument_input = takes_float_argument(agg_info);
2787  if (is_distinct_target(agg_info)) {
2788  CHECK(agg_info.agg_kind == kCOUNT ||
2789  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT);
2790  val1 = out_vec[out_vec_idx][0];
2791  error_code = 0;
2792  } else {
2793  const auto chosen_bytes = static_cast<size_t>(
2794  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
2795  std::tie(val1, error_code) = Executor::reduceResults(
2796  agg_info.agg_kind,
2797  agg_info.sql_type,
2798  query_exe_context->getAggInitValForIndex(out_vec_idx),
2799  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2800  out_vec[out_vec_idx],
2801  entry_count,
2802  false,
2803  float_argument_input);
2804  }
2805  if (error_code) {
2806  break;
2807  }
2808  reduced_outs.push_back(val1);
2809  if (agg_info.agg_kind == kAVG ||
2810  (agg_info.agg_kind == kSAMPLE &&
2811  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
2812  const auto chosen_bytes = static_cast<size_t>(
2813  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
2814  1));
2815  int64_t val2;
2816  std::tie(val2, error_code) = Executor::reduceResults(
2817  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
2818  agg_info.sql_type,
2819  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
2820  float_argument_input ? sizeof(int32_t) : chosen_bytes,
2821  out_vec[out_vec_idx + 1],
2822  entry_count,
2823  false,
2824  false);
2825  if (error_code) {
2826  break;
2827  }
2828  reduced_outs.push_back(val2);
2829  ++out_vec_idx;
2830  }
2831  ++out_vec_idx;
2832  }
2833  }
2834  }
2835 
2836  if (error_code) {
2837  return error_code;
2838  }
2839 
2840  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
2841  auto rows_ptr = std::shared_ptr<ResultSet>(
2842  query_exe_context->query_buffers_->result_sets_[0].release());
2843  rows_ptr->fillOneEntry(reduced_outs);
2844  results = std::move(rows_ptr);
2845  return error_code;
2846 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
bool useCudaBuffers() const
Definition: RenderInfo.cpp:69
const int8_t const int64_t * num_rows
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:943
GpuSharedMemoryContext gpu_smem_context
#define LOG(tag)
Definition: Logger.h:188
TargetInfo get_target_info(const PointerType target_expr, const bool bigint_count)
Definition: TargetInfo.h:78
size_t getSharedMemorySize() const
static std::pair< int64_t, int32_t > reduceResults(const SQLAgg agg, const SQLTypeInfo &ti, const int64_t agg_init_val, const int8_t out_byte_width, const int64_t *out_vec, const size_t out_vec_sz, const bool is_group_by, const bool float_argument_input)
Definition: Execute.cpp:632
static const int32_t ERR_GEOS
Definition: Execute.h:949
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:75
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:133
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const GpuCompilationContext *cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
std::unique_ptr< QueryMemoryInitializer > query_buffers_
std::vector< int64_t > getJoinHashTablePtrs(const ExecutorDeviceType device_type, const int device_id)
Definition: Execute.cpp:3023
static const int32_t ERR_DIV_BY_ZERO
Definition: Execute.h:935
CHECK(cgen_state)
#define INJECT_TIMER(DESC)
Definition: measure.h:91
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:941
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:942
bool g_bigint_count
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:129
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
int64_t getAggInitValForIndex(const size_t index) const
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:948
const std::shared_ptr< Analyzer::Estimator > estimator
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:936
std::shared_ptr< CompilationContext > generated_code
QueryMemoryDescriptor query_mem_desc_
int32_t executePlanWithoutGroupBy(const RelAlgExecutionUnit &ra_exe_unit, const CompilationResult &, const bool hoist_literals, ResultSetPtr &results, const std::vector< Analyzer::Expr * > &target_exprs, const ExecutorDeviceType device_type, std::vector< std::vector< const int8_t * >> &col_buffers, QueryExecutionContext *query_exe_context, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, Data_Namespace::DataMgr *data_mgr, const int device_id, const uint32_t start_rowid, const uint32_t num_tables, RenderInfo *render_info)
Definition: Execute.cpp:2657
std::vector< int8_t > serializeLiterals(const std::unordered_map< int, CgenState::LiteralValues > &literals, const int device_id)
Definition: Execute.cpp:353
Definition: sqldefs.h:76
unsigned gridSize() const
Definition: Execute.cpp:3113
std::unordered_map< int, CgenState::LiteralValues > literal_values
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
void resetInterrupt()
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
static std::atomic< bool > interrupted_
Definition: Execute.h:879
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
unsigned blockSize() const
Definition: Execute.cpp:3128
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
std::unique_ptr< ResultSet > estimator_result_set_
Definition: sqldefs.h:72
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:108

+ Here is the call graph for this function:

ResultSetPtr Executor::executeTableFunction ( const TableFunctionExecutionUnit  exe_unit,
const std::vector< InputTableInfo > &  table_infos,
const CompilationOptions co,
const ExecutionOptions eo,
const Catalog_Namespace::Catalog cat 
)
private

Compiles and dispatches a table function; that is, a function that takes as input one or more columns and returns a ResultSet, which can be parsed by subsequent execution steps.

Definition at line 1598 of file Execute.cpp.

References CHECK_EQ, TableFunctionCompilationContext::compile(), CompilationOptions::device_type, TableFunctionExecutionContext::execute(), and INJECT_TIMER.

1603  {
1604  INJECT_TIMER(Exec_executeTableFunction);
1605  nukeOldState(false, table_infos, nullptr);
1606 
1607  ColumnCacheMap column_cache; // Note: if we add retries to the table function
1608  // framework, we may want to move this up a level
1609 
1610  ColumnFetcher column_fetcher(this, column_cache);
1611  TableFunctionCompilationContext compilation_context;
1612  compilation_context.compile(exe_unit, co, this);
1613 
1615  CHECK_EQ(table_infos.size(), size_t(1));
1616  return exe_context.execute(exe_unit,
1617  table_infos.front(),
1618  &compilation_context,
1619  column_fetcher,
1620  co.device_type,
1621  this);
1622 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
const std::shared_ptr< RowSetMemoryOwner > getRowSetMemoryOwner() const
Definition: Execute.cpp:255
#define INJECT_TIMER(DESC)
Definition: measure.h:91
void nukeOldState(const bool allow_lazy_fetch, const std::vector< InputTableInfo > &query_infos, const RelAlgExecutionUnit *ra_exe_unit)
Definition: Execute.cpp:3038
ExecutorDeviceType device_type
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
void compile(const TableFunctionExecutionUnit &exe_unit, const CompilationOptions &co, Executor *executor)

+ Here is the call graph for this function:

void Executor::executeUpdate ( const RelAlgExecutionUnit ra_exe_unit,
const std::vector< InputTableInfo > &  table_infos,
const CompilationOptions co,
const ExecutionOptions eo,
const Catalog_Namespace::Catalog cat,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const UpdateLogForFragment::Callback cb,
const bool  is_agg 
)
private

Definition at line 62 of file ExecuteUpdate.cpp.

References CHECK(), CHECK_EQ, CHECK_GT, CPU, FragmentsPerTable::fragment_ids, SharedKernelContext::getFragmentResults(), KernelPerFragment, query_mem_desc, ExecutionKernel::run(), logger::thread_id(), timer_start(), timer_stop(), and VLOG.

69  {
70  CHECK(cb);
71  VLOG(1) << "Executing update/delete work unit:" << ra_exe_unit_in;
72 
73  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in, co);
74  ColumnCacheMap column_cache;
75 
76  ColumnFetcher column_fetcher(this, column_cache);
77  CHECK_GT(ra_exe_unit.input_descs.size(), size_t(0));
78  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
79  const auto& outer_fragments = table_infos.front().info.fragments;
80 
81  std::vector<FragmentsPerTable> fragments = {{0, {0}}};
82  for (size_t tab_idx = 1; tab_idx < ra_exe_unit.input_descs.size(); tab_idx++) {
83  int table_id = ra_exe_unit.input_descs[tab_idx].getTableId();
84  CHECK_EQ(table_infos[tab_idx].table_id, table_id);
85  const auto& fragmentsPerTable = table_infos[tab_idx].info.fragments;
86  FragmentsPerTable entry = {table_id, {}};
87  for (size_t innerFragId = 0; innerFragId < fragmentsPerTable.size(); innerFragId++) {
88  entry.fragment_ids.push_back(innerFragId);
89  }
90  fragments.push_back(entry);
91  }
92 
93  if (outer_fragments.empty()) {
94  return;
95  }
96 
97  const auto max_tuple_count_fragment_it = std::max_element(
98  outer_fragments.begin(), outer_fragments.end(), [](const auto& a, const auto& b) {
99  return a.getNumTuples() < b.getNumTuples();
100  });
101  CHECK(max_tuple_count_fragment_it != outer_fragments.end());
102  int64_t global_max_groups_buffer_entry_guess =
103  max_tuple_count_fragment_it->getNumTuples();
104  if (is_agg) {
105  global_max_groups_buffer_entry_guess = std::min(
106  2 * global_max_groups_buffer_entry_guess, static_cast<int64_t>(100'000'000));
107  }
108 
109  auto query_comp_desc = std::make_unique<QueryCompilationDescriptor>();
110  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc;
111  {
112  auto clock_begin = timer_start();
113  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
114  compilation_queue_time_ms_ += timer_stop(clock_begin);
115 
116  query_mem_desc = query_comp_desc->compile(global_max_groups_buffer_entry_guess,
117  8,
118  /*has_cardinality_estimation=*/true,
119  ra_exe_unit,
120  table_infos,
121  column_fetcher,
122  co,
123  eo,
124  nullptr,
125  this);
126  }
127  CHECK(query_mem_desc);
128 
129  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
130  ++fragment_index) {
131  const int64_t crt_fragment_tuple_count =
132  outer_fragments[fragment_index].getNumTuples();
133  if (crt_fragment_tuple_count == 0) {
134  // nothing to update
135  continue;
136  }
137 
138  SharedKernelContext shared_context(table_infos);
139  fragments[0] = {table_id, {fragment_index}};
140  {
141  ExecutionKernel current_fragment_kernel(ra_exe_unit,
143  0,
144  eo,
145  column_fetcher,
146  *query_comp_desc,
147  *query_mem_desc,
148  fragments,
150  /*render_info=*/nullptr,
151  /*rowid_lookup_key=*/-1,
153 
154  auto clock_begin = timer_start();
155  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
156  kernel_queue_time_ms_ += timer_stop(clock_begin);
157 
158  current_fragment_kernel.run(this, shared_context);
159  }
160  const auto& proj_fragment_results = shared_context.getFragmentResults();
161  if (proj_fragment_results.empty()) {
162  continue;
163  }
164  const auto& proj_fragment_result = proj_fragment_results[0];
165  const auto proj_result_set = proj_fragment_result.first;
166  CHECK(proj_result_set);
167  cb({outer_fragments[fragment_index], fragment_index, proj_result_set});
168  }
169 }
bool is_agg(const Analyzer::Expr *expr)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
int64_t kernel_queue_time_ms_
Definition: Execute.h:903
int64_t compilation_queue_time_ms_
Definition: Execute.h:904
std::vector< InputDescriptor > input_descs
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
static std::mutex kernel_mutex_
Definition: Execute.h:952
#define CHECK_GT(x, y)
Definition: Logger.h:209
CHECK(cgen_state)
RelAlgExecutionUnit addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:3198
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
ThreadId thread_id()
Definition: Logger.cpp:715
static std::mutex compilation_mutex_
Definition: Execute.h:951
std::vector< size_t > fragment_ids
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:40

+ Here is the call graph for this function:

ResultSetPtr Executor::executeWorkUnit ( size_t &  max_groups_buffer_entry_guess,
const bool  is_agg,
const std::vector< InputTableInfo > &  query_infos,
const RelAlgExecutionUnit ra_exe_unit_in,
const CompilationOptions co,
const ExecutionOptions options,
const Catalog_Namespace::Catalog cat,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
RenderInfo render_info,
const bool  has_cardinality_estimation,
ColumnCacheMap column_cache 
)
private

Definition at line 1310 of file Execute.cpp.

References executor_id_(), CompilationRetryNewScanLimit::new_scan_limit_, anonymous_namespace{Execute.cpp}::replace_scan_limit(), run_benchmark_import::result, and VLOG.

1321  {
1322  VLOG(1) << "Executor " << executor_id_ << " is executing work unit:" << ra_exe_unit_in;
1323 
1324  ScopeGuard cleanup_post_execution = [this] {
1325  // cleanup/unpin GPU buffer allocations
1326  // TODO: separate out this state into a single object
1327  plan_state_.reset(nullptr);
1328  if (cgen_state_) {
1329  cgen_state_->in_values_bitmaps_.clear();
1330  }
1331  };
1332 
1333  try {
1334  auto result = executeWorkUnitImpl(max_groups_buffer_entry_guess,
1335  is_agg,
1336  true,
1337  query_infos,
1338  ra_exe_unit_in,
1339  co,
1340  eo,
1341  cat,
1342  row_set_mem_owner,
1343  render_info,
1344  has_cardinality_estimation,
1345  column_cache);
1346  if (result) {
1347  result->setKernelQueueTime(kernel_queue_time_ms_);
1348  result->addCompilationQueueTime(compilation_queue_time_ms_);
1349  }
1350  return result;
1351  } catch (const CompilationRetryNewScanLimit& e) {
1352  auto result =
1353  executeWorkUnitImpl(max_groups_buffer_entry_guess,
1354  is_agg,
1355  false,
1356  query_infos,
1357  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
1358  co,
1359  eo,
1360  cat,
1361  row_set_mem_owner,
1362  render_info,
1363  has_cardinality_estimation,
1364  column_cache);
1365  if (result) {
1366  result->setKernelQueueTime(kernel_queue_time_ms_);
1367  result->addCompilationQueueTime(compilation_queue_time_ms_);
1368  }
1369  return result;
1370  }
1371 }
bool is_agg(const Analyzer::Expr *expr)
int64_t kernel_queue_time_ms_
Definition: Execute.h:903
int64_t compilation_queue_time_ms_
Definition: Execute.h:904
std::unique_ptr< CgenState > cgen_state_
Definition: Execute.h:855
const ExecutorId executor_id_
Definition: Execute.h:899
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:870
RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit &ra_exe_unit_in, const size_t new_scan_limit)
Definition: Execute.cpp:1291
ResultSetPtr executeWorkUnitImpl(size_t &max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, const std::vector< InputTableInfo > &, const RelAlgExecutionUnit &, const CompilationOptions &, const ExecutionOptions &options, const Catalog_Namespace::Catalog &, std::shared_ptr< RowSetMemoryOwner >, RenderInfo *render_info, const bool has_cardinality_estimation, ColumnCacheMap &column_cache)
Definition: Execute.cpp:1373
#define VLOG(n)
Definition: Logger.h:291

+ Here is the call graph for this function:

ResultSetPtr Executor::executeWorkUnitImpl ( size_t &  max_groups_buffer_entry_guess,
const bool  is_agg,
const bool  allow_single_frag_table_opt,
const std::vector< InputTableInfo > &  query_infos,
const RelAlgExecutionUnit ra_exe_unit_in,
const CompilationOptions co,
const ExecutionOptions options,
const Catalog_Namespace::Catalog cat,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
RenderInfo render_info,
const bool  has_cardinality_estimation,
ColumnCacheMap column_cache 
)
private

Definition at line 1373 of file Execute.cpp.

References CompilationOptions::allow_lazy_fetch, ExecutionOptions::allow_runtime_query_interrupt, CHECK(), anonymous_namespace{Execute.cpp}::compute_buffer_entry_guess(), CPU, cpu_threads(), CompilationOptions::device_type, ExecutionOptions::executor_type, CompilationOptions::explain_type, CompilationOptions::filter_on_deleted_column, g_use_tbb_pool, get_available_gpus(), get_context_count(), get_min_byte_width(), QueryExecutionError::getErrorCode(), CompilationOptions::hoist_literals, INJECT_TIMER, ExecutionOptions::just_explain, ExecutionOptions::just_validate, MAX_BYTE_WIDTH_SUPPORTED, Native, CompilationOptions::opt_level, Projection, CompilationOptions::register_intel_jit_listener, timer_start(), timer_stop(), VLOG, CompilationOptions::with_dynamic_watchdog, and ExecutionOptions::with_dynamic_watchdog.

1385  {
1386  INJECT_TIMER(Exec_executeWorkUnit);
1387  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in, co);
1388  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type);
1389  CHECK(!query_infos.empty());
1390  if (!max_groups_buffer_entry_guess) {
1391  // The query has failed the first execution attempt because of running out
1392  // of group by slots. Make the conservative choice: allocate fragment size
1393  // slots and run on the CPU.
1394  CHECK(device_type == ExecutorDeviceType::CPU);
1395  max_groups_buffer_entry_guess = compute_buffer_entry_guess(query_infos);
1396  }
1397 
1398  int8_t crt_min_byte_width{get_min_byte_width()};
1399  do {
1400  SharedKernelContext shared_context(query_infos);
1401  ColumnFetcher column_fetcher(this, column_cache);
1402  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1403  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1404  if (eo.executor_type == ExecutorType::Native) {
1405  try {
1406  INJECT_TIMER(query_step_compilation);
1407  auto clock_begin = timer_start();
1408  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1409  compilation_queue_time_ms_ += timer_stop(clock_begin);
1410 
1411  query_mem_desc_owned =
1412  query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
1413  crt_min_byte_width,
1414  has_cardinality_estimation,
1415  ra_exe_unit,
1416  query_infos,
1417  column_fetcher,
1418  {device_type,
1419  co.hoist_literals,
1420  co.opt_level,
1422  co.allow_lazy_fetch,
1424  co.explain_type,
1426  eo,
1427  render_info,
1428  this);
1429  CHECK(query_mem_desc_owned);
1430  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
1431  } catch (CompilationRetryNoCompaction&) {
1432  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
1433  continue;
1434  }
1435  } else {
1436  plan_state_.reset(new PlanState(false, this));
1437  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
1438  CHECK(!query_mem_desc_owned);
1439  query_mem_desc_owned.reset(
1441  }
1442  if (eo.just_explain) {
1443  return executeExplain(*query_comp_desc_owned);
1444  }
1445 
1446  for (const auto target_expr : ra_exe_unit.target_exprs) {
1447  plan_state_->target_exprs_.push_back(target_expr);
1448  }
1449 
1450  if (!eo.just_validate) {
1451  int available_cpus = cpu_threads();
1452  auto available_gpus = get_available_gpus(cat);
1453 
1454  const auto context_count =
1455  get_context_count(device_type, available_cpus, available_gpus.size());
1456  try {
1457  auto kernels = createKernels(shared_context,
1458  ra_exe_unit,
1459  column_fetcher,
1460  query_infos,
1461  eo,
1462  is_agg,
1463  allow_single_frag_table_opt,
1464  context_count,
1465  *query_comp_desc_owned,
1466  *query_mem_desc_owned,
1467  render_info,
1468  available_gpus,
1469  available_cpus);
1470  if (g_use_tbb_pool) {
1471 #ifdef HAVE_TBB
1472  VLOG(1) << "Using TBB thread pool for kernel dispatch.";
1473  launchKernels<threadpool::TbbThreadPool<void>>(shared_context,
1474  std::move(kernels));
1475 #else
1476  throw std::runtime_error(
1477  "This build is not TBB enabled. Restart the server with "
1478  "\"enable-modern-thread-pool\" disabled.");
1479 #endif
1480  } else {
1481  launchKernels<threadpool::FuturesThreadPool<void>>(shared_context,
1482  std::move(kernels));
1483  }
1484  } catch (QueryExecutionError& e) {
1485  if (eo.with_dynamic_watchdog && interrupted_.load() &&
1486  e.getErrorCode() == ERR_OUT_OF_TIME) {
1487  resetInterrupt();
1489  }
1490  if (eo.allow_runtime_query_interrupt && interrupted_.load()) {
1491  resetInterrupt();
1493  }
1495  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
1496  crt_min_byte_width <<= 1;
1497  continue;
1498  }
1499  throw;
1500  }
1501  }
1502  if (is_agg) {
1503  try {
1504  return collectAllDeviceResults(shared_context,
1505  ra_exe_unit,
1506  *query_mem_desc_owned,
1507  query_comp_desc_owned->getDeviceType(),
1508  row_set_mem_owner);
1509  } catch (ReductionRanOutOfSlots&) {
1511  } catch (OverflowOrUnderflow&) {
1512  crt_min_byte_width <<= 1;
1513  continue;
1514  }
1515  }
1516  return resultsUnion(shared_context, ra_exe_unit);
1517 
1518  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
1519 
1520  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
1523  nullptr,
1524  this);
1525 }
bool is_agg(const Analyzer::Expr *expr)
std::vector< std::unique_ptr< ExecutionKernel > > createKernels(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, ColumnFetcher &column_fetcher, const std::vector< InputTableInfo > &table_infos, const ExecutionOptions &eo, const bool is_agg, const bool allow_single_frag_table_opt, const size_t context_count, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, RenderInfo *render_info, std::unordered_set< int > &available_gpus, int &available_cpus)
Definition: Execute.cpp:1944
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
int64_t compilation_queue_time_ms_
Definition: Execute.h:904
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:943
std::unordered_set< int > get_available_gpus(const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:995
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
ExecutorOptLevel opt_level
size_t compute_buffer_entry_guess(const std::vector< InputTableInfo > &query_infos)
Definition: Execute.cpp:1019
int8_t get_min_byte_width()
ResultSetPtr collectAllDeviceResults(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
Definition: Execute.cpp:1755
CHECK(cgen_state)
size_t get_context_count(const ExecutorDeviceType device_type, const size_t cpu_count, const size_t gpu_count)
Definition: Execute.cpp:1007
#define INJECT_TIMER(DESC)
Definition: measure.h:91
friend class QueryMemoryDescriptor
Definition: Execute.h:961
ExecutorExplainType explain_type
static const int32_t ERR_OVERFLOW_OR_UNDERFLOW
Definition: Execute.h:941
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:870
static const int32_t ERR_OUT_OF_TIME
Definition: Execute.h:942
RelAlgExecutionUnit addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:3198
ExecutorDeviceType device_type
ResultSetPtr resultsUnion(SharedKernelContext &shared_context, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:842
void resetInterrupt()
static std::atomic< bool > interrupted_
Definition: Execute.h:879
static const int32_t ERR_OUT_OF_SLOTS
Definition: Execute.h:937
static std::mutex compilation_mutex_
Definition: Execute.h:951
constexpr int8_t MAX_BYTE_WIDTH_SUPPORTED
ExecutorDeviceType getDeviceTypeForTargets(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType requested_device_type)
Definition: Execute.cpp:1628
int cpu_threads()
Definition: thread_count.h:25
bool g_use_tbb_pool
Definition: Execute.cpp:76
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:40
ResultSetPtr executeExplain(const QueryCompilationDescriptor &)
Definition: Execute.cpp:1624

+ Here is the call graph for this function:

void Executor::executeWorkUnitPerFragment ( const RelAlgExecutionUnit ra_exe_unit,
const InputTableInfo table_info,
const CompilationOptions co,
const ExecutionOptions eo,
const Catalog_Namespace::Catalog cat,
PerFragmentCallBack cb 
)
private

Compiles and dispatches a work unit per fragment processing results with the per fragment callback. Currently used for computing metrics over fragments (metadata).

Definition at line 1527 of file Execute.cpp.

References CHECK(), CHECK_EQ, CompilationOptions::device_type, Fragmenter_Namespace::TableInfo::fragments, SharedKernelContext::getFragmentResults(), InputTableInfo::info, KernelPerFragment, ExecutionKernel::run(), logger::thread_id(), timer_start(), and timer_stop().

1532  {
1533  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in, co);
1534  ColumnCacheMap column_cache;
1535 
1536  std::vector<InputTableInfo> table_infos{table_info};
1537  SharedKernelContext kernel_context(table_infos);
1538 
1539  ColumnFetcher column_fetcher(this, column_cache);
1540  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
1541  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
1542  {
1543  auto clock_begin = timer_start();
1544  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
1545  compilation_queue_time_ms_ += timer_stop(clock_begin);
1546  query_mem_desc_owned =
1547  query_comp_desc_owned->compile(0,
1548  8,
1549  /*has_cardinality_estimation=*/false,
1550  ra_exe_unit,
1551  table_infos,
1552  column_fetcher,
1553  co,
1554  eo,
1555  nullptr,
1556  this);
1557  }
1558  CHECK(query_mem_desc_owned);
1559  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
1560  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
1561  const auto& outer_fragments = table_info.info.fragments;
1562 
1563  {
1564  auto clock_begin = timer_start();
1565  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
1566  kernel_queue_time_ms_ += timer_stop(clock_begin);
1567 
1568  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1569  ++fragment_index) {
1570  // We may want to consider in the future allowing this to execute on devices other
1571  // than CPU
1572  FragmentsList fragments_list{{table_id, {fragment_index}}};
1573  ExecutionKernel kernel(ra_exe_unit,
1574  co.device_type,
1575  /*device_id=*/0,
1576  eo,
1577  column_fetcher,
1578  *query_comp_desc_owned,
1579  *query_mem_desc_owned,
1580  fragments_list,
1582  /*render_info=*/nullptr,
1583  /*rowid_lookup_key=*/-1,
1584  logger::thread_id());
1585  kernel.run(this, kernel_context);
1586  }
1587  }
1588 
1589  const auto& all_fragment_results = kernel_context.getFragmentResults();
1590 
1591  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
1592  ++fragment_index) {
1593  const auto fragment_results = all_fragment_results[fragment_index];
1594  cb(fragment_results.first, outer_fragments[fragment_index]);
1595  }
1596 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
int64_t kernel_queue_time_ms_
Definition: Execute.h:903
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
int64_t compilation_queue_time_ms_
Definition: Execute.h:904
std::vector< InputDescriptor > input_descs
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:161
static std::mutex kernel_mutex_
Definition: Execute.h:952
std::vector< FragmentsPerTable > FragmentsList
CHECK(cgen_state)
RelAlgExecutionUnit addDeletedColumn(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co)
Definition: Execute.cpp:3198
ExecutorDeviceType device_type
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
ThreadId thread_id()
Definition: Logger.cpp:715
static std::mutex compilation_mutex_
Definition: Execute.h:951
Type timer_start()
Definition: measure.h:40

+ Here is the call graph for this function:

FetchResult Executor::fetchChunks ( const ColumnFetcher column_fetcher,
const RelAlgExecutionUnit ra_exe_unit,
const int  device_id,
const Data_Namespace::MemoryLevel  memory_level,
const std::map< int, const TableFragments * > &  all_tables_fragments,
const FragmentsList selected_fragments,
const Catalog_Namespace::Catalog cat,
std::list< ChunkIter > &  chunk_iterators,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunks,
DeviceAllocator device_allocator 
)
private

Definition at line 2310 of file Execute.cpp.

References cat(), CHECK(), CHECK_EQ, CHECK_LT, Data_Namespace::CPU_LEVEL, DEBUG_TIMER, g_enable_dynamic_watchdog, g_enable_runtime_query_interrupt, ColumnFetcher::getAllTableColumnFragments(), ColumnFetcher::getOneTableColumnFragment(), ColumnFetcher::getResultSetColumn(), INJECT_TIMER, RelAlgExecutionUnit::input_col_descs, RelAlgExecutionUnit::input_descs, RESULT, and anonymous_namespace{Execute.cpp}::try_get_column_descriptor().

2320  {
2321  auto timer = DEBUG_TIMER(__func__);
2323  const auto& col_global_ids = ra_exe_unit.input_col_descs;
2324  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2325  std::vector<size_t> local_col_to_frag_pos;
2326  buildSelectedFragsMapping(selected_fragments_crossjoin,
2327  local_col_to_frag_pos,
2328  col_global_ids,
2329  selected_fragments,
2330  ra_exe_unit);
2331 
2333  selected_fragments_crossjoin);
2334 
2335  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2336  std::vector<std::vector<int64_t>> all_num_rows;
2337  std::vector<std::vector<uint64_t>> all_frag_offsets;
2338 
2339  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2340  std::vector<const int8_t*> frag_col_buffers(
2341  plan_state_->global_to_local_col_ids_.size());
2342  for (const auto& col_id : col_global_ids) {
2343  // check whether the interrupt flag turns on (non kernel-time query interrupt)
2345  interrupted_.load()) {
2346  resetInterrupt();
2348  }
2349  CHECK(col_id);
2350  const int table_id = col_id->getScanDesc().getTableId();
2351  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2352  if (cd && cd->isVirtualCol) {
2353  CHECK_EQ("rowid", cd->columnName);
2354  continue;
2355  }
2356  const auto fragments_it = all_tables_fragments.find(table_id);
2357  CHECK(fragments_it != all_tables_fragments.end());
2358  const auto fragments = fragments_it->second;
2359  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
2360  CHECK(it != plan_state_->global_to_local_col_ids_.end());
2361  CHECK_LT(static_cast<size_t>(it->second),
2362  plan_state_->global_to_local_col_ids_.size());
2363  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
2364  if (!fragments->size()) {
2365  return {};
2366  }
2367  CHECK_LT(frag_id, fragments->size());
2368  auto memory_level_for_column = memory_level;
2369  if (plan_state_->columns_to_fetch_.find(
2370  std::make_pair(col_id->getScanDesc().getTableId(), col_id->getColId())) ==
2371  plan_state_->columns_to_fetch_.end()) {
2372  memory_level_for_column = Data_Namespace::CPU_LEVEL;
2373  }
2374  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
2375  frag_col_buffers[it->second] = column_fetcher.getResultSetColumn(
2376  col_id.get(), memory_level_for_column, device_id, device_allocator);
2377  } else {
2378  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
2379  frag_col_buffers[it->second] =
2380  column_fetcher.getAllTableColumnFragments(table_id,
2381  col_id->getColId(),
2382  all_tables_fragments,
2383  memory_level_for_column,
2384  device_id,
2385  device_allocator);
2386  } else {
2387  frag_col_buffers[it->second] =
2388  column_fetcher.getOneTableColumnFragment(table_id,
2389  frag_id,
2390  col_id->getColId(),
2391  all_tables_fragments,
2392  chunks,
2393  chunk_iterators,
2394  memory_level_for_column,
2395  device_id,
2396  device_allocator);
2397  }
2398  }
2399  }
2400  all_frag_col_buffers.push_back(frag_col_buffers);
2401  }
2402  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
2403  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
2404  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
2405 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
const ColumnDescriptor * try_get_column_descriptor(const InputColDescriptor *col_desc, const Catalog_Namespace::Catalog &cat)
Definition: Execute.cpp:2208
std::string cat(Ts &&...args)
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:943
std::vector< InputDescriptor > input_descs
FetchResult fetchChunks(const ColumnFetcher &, const RelAlgExecutionUnit &ra_exe_unit, const int device_id, const Data_Namespace::MemoryLevel, const std::map< int, const TableFragments * > &, const FragmentsList &selected_fragments, const Catalog_Namespace::Catalog &, std::list< ChunkIter > &, std::list< std::shared_ptr< Chunk_NS::Chunk >> &, DeviceAllocator *device_allocator)
Definition: Execute.cpp:2310
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:75
bool needFetchAllFragments(const InputColDescriptor &col_desc, const RelAlgExecutionUnit &ra_exe_unit, const FragmentsList &selected_fragments) const
Definition: Execute.cpp:2285
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
CHECK(cgen_state)
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
#define INJECT_TIMER(DESC)
Definition: measure.h:91
std::unique_ptr< PlanState > plan_state_
Definition: Execute.h:870
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
#define CHECK_LT(x, y)
Definition: Logger.h:207
std::pair< std::vector< std::vector< int64_t > >, std::vector< std::vector< uint64_t > > > getRowCountAndOffsetForAllFrags(const RelAlgExecutionUnit &ra_exe_unit, const CartesianProduct< std::vector< std::vector< size_t >>> &frag_ids_crossjoin, const std::vector< InputDescriptor > &input_descs, const std::map< int, const TableFragments * > &all_tables_fragments)
Definition: Execute.cpp:2236
void buildSelectedFragsMapping(std::vector< std::vector< size_t >> &selected_fragments_crossjoin, std::vector< size_t > &local_col_to_frag_pos, const std::list< std::shared_ptr< const InputColDescriptor >> &col_global_ids, const FragmentsList &selected_fragments, const RelAlgExecutionUnit &ra_exe_unit)
Definition: Execute.cpp:2573
void resetInterrupt()
static std::atomic< bool > interrupted_
Definition: Execute.h:879
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::list< std::shared_ptr< const InputColDescriptor > > input_col_descs
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:108

+ Here is the call graph for this function:

FetchResult Executor::fetchUnionChunks ( const ColumnFetcher column_fetcher,
const RelAlgExecutionUnit ra_exe_unit,
const int  device_id,
const Data_Namespace::MemoryLevel  memory_level,
const std::map< int, const TableFragments * > &  all_tables_fragments,
const FragmentsList selected_fragments,
const Catalog_Namespace::Catalog cat,
std::list< ChunkIter > &  chunk_iterators,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunks,
DeviceAllocator device_allocator 
)
private

Definition at line 2409 of file Execute.cpp.

References cat(), CHECK(), CHECK_EQ, CHECK_LE, CHECK_LT, Data_Namespace::CPU_LEVEL, DEBUG_TIMER, ColumnFetcher::getAllTableColumnFragments(), ColumnFetcher::getOneTableColumnFragment(), ColumnFetcher::getResultSetColumn(), INJECT_TIMER, RelAlgExecutionUnit::input_col_descs, RelAlgExecutionUnit::input_descs, num_rows, shared::printContainer(), RESULT, anonymous_namespace{Execute.cpp}::try_get_column_descriptor(), RelAlgExecutionUnit::union_all, and VLOG.

2419  {
2420  auto timer = DEBUG_TIMER(__func__);
2422 
2423  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
2424  std::vector<std::vector<int64_t>> all_num_rows;
2425  std::vector<std::vector<uint64_t>> all_frag_offsets;
2426 
2427  CHECK(!selected_fragments.empty());
2428  CHECK_LE(2u, ra_exe_unit.input_descs.size());
2429  CHECK_LE(2u, ra_exe_unit.input_col_descs.size());
2430  using TableId = int;
2431  TableId const selected_table_id = selected_fragments.front().table_id;
2432  bool const input_descs_index =
2433  selected_table_id == ra_exe_unit.input_descs[1].getTableId();
2434  if (!input_descs_index) {
2435  CHECK_EQ(selected_table_id, ra_exe_unit.input_descs[0].getTableId());
2436  }
2437  bool const input_col_descs_index =
2438  selected_table_id ==
2439  (*std::next(ra_exe_unit.input_col_descs.begin()))->getScanDesc().getTableId();
2440  if (!input_col_descs_index) {
2441  CHECK_EQ(selected_table_id,
2442  ra_exe_unit.input_col_descs.front()->getScanDesc().getTableId());
2443  }
2444  VLOG(2) << "selected_fragments.size()=" << selected_fragments.size()
2445  << " selected_table_id=" << selected_table_id
2446  << " input_descs_index=" << int(input_descs_index)
2447  << " input_col_descs_index=" << int(input_col_descs_index)
2448  << " ra_exe_unit.input_descs="
2449  << shared::printContainer(ra_exe_unit.input_descs)
2450  << " ra_exe_unit.input_col_descs="
2451  << shared::printContainer(ra_exe_unit.input_col_descs);
2452 
2453  // Partition col_global_ids by table_id
2454  std::unordered_map<TableId, std::list<std::shared_ptr<const InputColDescriptor>>>
2455  table_id_to_input_col_descs;
2456  for (auto const& input_col_desc : ra_exe_unit.input_col_descs) {
2457  TableId const table_id = input_col_desc->getScanDesc().getTableId();
2458  table_id_to_input_col_descs[table_id].push_back(input_col_desc);
2459  }
2460  for (auto const& pair : table_id_to_input_col_descs) {
2461  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
2462  std::vector<size_t> local_col_to_frag_pos;
2463 
2464  buildSelectedFragsMappingForUnion(selected_fragments_crossjoin,
2465  local_col_to_frag_pos,
2466  pair.second,
2467  selected_fragments,
2468  ra_exe_unit);
2469 
2471  selected_fragments_crossjoin);
2472 
2473  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2474  std::vector<const int8_t*> frag_col_buffers(
2475  plan_state_->global_to_local_col_ids_.size());
2476  for (const auto& col_id : pair.second) {
2477  CHECK(col_id);
2478  const int table_id = col_id->getScanDesc().getTableId();
2479  CHECK_EQ(table_id, pair.first);
2480  const auto cd = try_get_column_descriptor(col_id.get(), cat);
2481  if (cd && cd->isVirtualCol) {
2482  CHECK_EQ("rowid", cd->columnName);
2483  continue;
2484  }
2485  const auto fragments_it = all_tables_fragments.find(table_id);
2486  CHECK(fragments_it != all_tables_fragments.end());
2487  const auto fragments = fragments_it->second;
2488  auto it = plan_state