24 : executor_(executor), columnarized_table_cache_(column_cache) {}
36 std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
38 static std::mutex columnar_conversion_mutex;
45 const auto& catalog = *executor->getCatalog();
48 CHECK(!cd || !(cd->isVirtualCol));
49 const int8_t* col_buff =
nullptr;
51 ChunkKey chunk_key{catalog.getCurrentDB().dbId,
57 &catalog.getDataMgr(),
61 chunk_meta_it->second->numBytes,
62 chunk_meta_it->second->numElements);
63 chunks_owner.push_back(chunk);
65 auto ab = chunk->getBuffer();
66 CHECK(ab->getMemoryPtr());
67 col_buff =
reinterpret_cast<int8_t*
>(ab->getMemoryPtr());
71 std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
73 if (column_cache.empty() || !column_cache.count(table_id)) {
74 column_cache.insert(std::make_pair(
75 table_id, std::unordered_map<
int, std::shared_ptr<const ColumnarResults>>()));
77 auto& frag_id_to_result = column_cache[table_id];
78 if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
79 frag_id_to_result.insert(
80 std::make_pair(frag_id,
82 executor->row_set_mem_owner_,
86 col_frag = column_cache[table_id][frag_id].get();
91 &catalog.getDataMgr(),
111 const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
115 std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
116 std::vector<std::shared_ptr<void>>& malloc_owner,
118 CHECK(!fragments.empty());
120 size_t col_chunks_buff_sz =
sizeof(
struct JoinChunk) * fragments.size();
122 auto col_chunks_buff =
reinterpret_cast<int8_t*
>(
123 malloc_owner.emplace_back(
checked_malloc(col_chunks_buff_sz), free).get());
124 auto join_chunk_array =
reinterpret_cast<struct
JoinChunk*
>(col_chunks_buff);
127 size_t num_chunks = 0;
128 for (
auto& frag : fragments) {
139 num_elems += elem_count;
150 return {col_chunks_buff,
154 static_cast<size_t>(elem_sz)};
161 const std::map<int, const TableFragments*>& all_tables_fragments,
162 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
163 std::list<ChunkIter>& chunk_iter_holder,
167 static std::mutex varlen_chunk_mutex;
168 static std::mutex chunk_list_mutex;
169 const auto fragments_it = all_tables_fragments.find(table_id);
170 CHECK(fragments_it != all_tables_fragments.end());
171 const auto fragments = fragments_it->second;
172 const auto& fragment = (*fragments)[frag_id];
173 if (fragment.isEmptyPhysicalFragment()) {
176 std::shared_ptr<Chunk_NS::Chunk> chunk;
177 auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
178 CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
183 const auto col_type =
185 const bool is_real_string =
186 col_type.is_string() && col_type.get_compression() ==
kENCODING_NONE;
187 const bool is_varlen =
192 cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
193 std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
195 varlen_chunk_lock.reset(
new std::lock_guard<std::mutex>(varlen_chunk_mutex));
203 chunk_meta_it->second->numBytes,
204 chunk_meta_it->second->numElements);
205 std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex);
206 chunk_holder.push_back(chunk);
210 CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
211 chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
212 auto& chunk_iter = chunk_iter_holder.back();
214 return reinterpret_cast<int8_t*
>(&chunk_iter);
216 auto ab = chunk->getBuffer();
218 auto& row_set_mem_owner =
executor_->getRowSetMemoryOwner();
219 row_set_mem_owner->addVarlenInputBuffer(ab);
224 chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter),
sizeof(
ChunkIter));
225 return chunk_iter_gpu;
228 auto ab = chunk->getBuffer();
229 CHECK(ab->getMemoryPtr());
230 return ab->getMemoryPtr();
237 const std::map<int, const TableFragments*>& all_tables_fragments,
241 const auto fragments_it = all_tables_fragments.find(table_id);
242 CHECK(fragments_it != all_tables_fragments.end());
243 const auto fragments = fragments_it->second;
244 const auto frag_count = fragments->size();
245 std::vector<std::unique_ptr<ColumnarResults>> column_frags;
253 for (
size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
254 std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
255 std::list<ChunkIter> chunk_iter_holder;
256 const auto& fragment = (*fragments)[frag_id];
257 if (fragment.isEmptyPhysicalFragment()) {
260 auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
261 CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
263 static_cast<int>(frag_id),
265 all_tables_fragments,
271 column_frags.push_back(
272 std::make_unique<ColumnarResults>(
executor_->row_set_mem_owner_,
274 fragment.getNumTuples(),
275 chunk_meta_it->second->sqlType));
277 auto merged_results =
279 table_column = merged_results.get();
282 table_column = column_it->second.get();
315 if (!columnar_results) {
319 CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
321 const auto& col_ti = columnar_results->
getColumnType(col_id);
322 const auto num_bytes = columnar_results->
size() * col_ti.get_size();
323 CHECK(device_allocator);
324 auto gpu_col_buffer = device_allocator->
alloc(num_bytes);
325 device_allocator->
copyToDevice(gpu_col_buffer, col_buffers[col_id], num_bytes);
326 return gpu_col_buffer;
328 return col_buffers[col_id];
343 table_id, std::unordered_map<
int, std::shared_ptr<const ColumnarResults>>()));
347 if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
348 frag_id_to_result.insert(
349 std::make_pair(frag_id,
351 executor_->row_set_mem_owner_, buffer, frag_id))));
size_t getNumTuples() const
std::vector< int > ChunkKey
HOST DEVICE int get_size() const
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
virtual void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const =0
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
virtual int8_t * alloc(const size_t num_bytes)=0
ColumnCacheMap columnarized_table_cache_
std::shared_ptr< ResultSet > ResultSetPtr
bool isEmptyPhysicalFragment() const
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
std::mutex columnar_conversion_mutex_
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
void * checked_malloc(const size_t size)
const size_t size() const
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
ColumnFetcher(Executor *executor, const ColumnCacheMap &column_cache)
const ChunkMetadataMap & getChunkMetadataMap() const
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
const SQLTypeInfo & get_type_info() const
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const int frag_id)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk's pointer and element count on either CPU or GPU.
int get_column_id() const
const std::vector< int8_t * > & getColumnBuffers() const
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
const SQLTypeInfo & getColumnType(const int col_id) const