OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ColumnFetcher Class Reference

#include <ColumnFetcher.h>

+ Collaboration diagram for ColumnFetcher:

Public Member Functions

 ColumnFetcher (Executor *executor, const ColumnCacheMap &column_cache)
 
const int8_t * getOneTableColumnFragment (const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
 
const int8_t * getAllTableColumnFragments (const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
const int8_t * getResultSetColumn (const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
const int8_t * linearizeColumnFragments (const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 

Static Public Member Functions

static std::pair< const int8_t
*, size_t > 
getOneColumnFragment (Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
 Gets one chunk's pointer and element count on either CPU or GPU. More...
 
static JoinColumn makeJoinColumn (Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
 Creates a JoinColumn struct containing an array of JoinChunk structs. More...
 

Private Types

using CacheKey = std::vector< int >
 
using DeviceMergedChunkMap = std::unordered_map< int, std::shared_ptr< Chunk_NS::Chunk >>
 
using DeviceMergedChunkIterMap = std::unordered_map< int, int8_t * >
 

Private Member Functions

void addMergedChunk (const InputColDescriptor col_desc, const int device_id, std::shared_ptr< Chunk_NS::Chunk > chunk_ptr, int8_t *chunk_iter_ptr) const
 
const int8_t * getChunkiter (const InputColDescriptor col_desc, const int device_id=0) const
 
ChunkIter prepareChunkIter (AbstractBuffer *merged, ChunkIter &chunk_iter, const size_t total_num_tuples) const
 
const int8_t * getResultSetColumn (const ResultSetPtr &buffer, const int table_id, const int col_id, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 

Static Private Member Functions

static const int8_t * transferColumnIfNeeded (const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
 

Private Attributes

Executorexecutor_
 
std::mutex columnar_fetch_mutex_
 
ColumnCacheMap columnarized_table_cache_
 
std::unordered_map
< InputColDescriptor,
std::unordered_map< CacheKey,
std::unique_ptr< const
ColumnarResults > > > 
columnarized_ref_table_cache_
 
std::unordered_map
< InputColDescriptor,
std::unique_ptr< const
ColumnarResults > > 
columnarized_scan_table_cache_
 
std::unordered_map
< InputColDescriptor,
DeviceMergedChunkMap
linearized_multi_frag_table_cache_
 
std::unordered_map
< InputColDescriptor,
DeviceMergedChunkIterMap
linearized_multi_frag_chunk_iter_cache_
 

Friends

class QueryCompilationDescriptor
 
class TableFunctionExecutionContext
 

Detailed Description

Definition at line 47 of file ColumnFetcher.h.

Member Typedef Documentation

using ColumnFetcher::CacheKey = std::vector<int>
private

Definition at line 143 of file ColumnFetcher.h.

using ColumnFetcher::DeviceMergedChunkIterMap = std::unordered_map<int, int8_t*>
private

Definition at line 155 of file ColumnFetcher.h.

using ColumnFetcher::DeviceMergedChunkMap = std::unordered_map<int, std::shared_ptr<Chunk_NS::Chunk>>
private

Definition at line 152 of file ColumnFetcher.h.

Constructor & Destructor Documentation

ColumnFetcher::ColumnFetcher ( Executor executor,
const ColumnCacheMap column_cache 
)

Definition at line 44 of file ColumnFetcher.cpp.

45  : executor_(executor), columnarized_table_cache_(column_cache) {}
ColumnCacheMap columnarized_table_cache_
Executor * executor_

Member Function Documentation

void ColumnFetcher::addMergedChunk ( const InputColDescriptor  col_desc,
const int  device_id,
std::shared_ptr< Chunk_NS::Chunk chunk_ptr,
int8_t *  chunk_iter_ptr 
) const
private

Definition at line 493 of file ColumnFetcher.cpp.

References InputColDescriptor::getColId(), InputColDescriptor::getScanDesc(), InputDescriptor::getTableId(), linearized_multi_frag_chunk_iter_cache_, linearized_multi_frag_table_cache_, and VLOG.

Referenced by linearizeColumnFragments().

496  {
497  // 1. merged_chunk_ptr
498  auto chunk_it = linearized_multi_frag_table_cache_.find(col_desc);
499  if (chunk_it != linearized_multi_frag_table_cache_.end()) {
500  auto chunk_device_it = chunk_it->second.find(device_id);
501  if (chunk_device_it == chunk_it->second.end()) {
502  VLOG(2) << "Additional merged chunk for col_desc (tbl: "
503  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
504  << "), device: " << device_id;
505  chunk_it->second.emplace(device_id, chunk_ptr);
506  }
507  } else {
508  DeviceMergedChunkMap chunk_m;
509  chunk_m.emplace(device_id, chunk_ptr);
510  VLOG(2) << "New merged chunk for col_desc (tbl: "
511  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
512  << "), device: " << device_id;
513  linearized_multi_frag_table_cache_.emplace(col_desc, chunk_m);
514  }
515 
516  // 2. merged_chunk_iter_ptr
517  auto iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
518  if (iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
519  auto iter_device_it = iter_it->second.find(device_id);
520  if (iter_device_it == iter_it->second.end()) {
521  VLOG(2) << "Additional merged chunk_iter for col_desc (tbl: "
522  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
523  << "), device: " << device_id;
524  iter_it->second.emplace(device_id, chunk_iter_ptr);
525  }
526  } else {
528  iter_m.emplace(device_id, chunk_iter_ptr);
529  VLOG(2) << "New merged chunk_iter for col_desc (tbl: "
530  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
531  << "), device: " << device_id;
532  linearized_multi_frag_chunk_iter_cache_.emplace(col_desc, iter_m);
533  }
534 }
std::unordered_map< int, std::shared_ptr< Chunk_NS::Chunk >> DeviceMergedChunkMap
std::unordered_map< int, int8_t * > DeviceMergedChunkIterMap
int getColId() const
int getTableId() const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
const InputDescriptor & getScanDesc() const
#define VLOG(n)
Definition: Logger.h:297
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_multi_frag_table_cache_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getAllTableColumnFragments ( const int  table_id,
const int  col_id,
const std::map< int, const TableFragments * > &  all_tables_fragments,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const

Definition at line 268 of file ColumnFetcher.cpp.

References CHECK, check_interrupt(), columnar_fetch_mutex_, columnarized_scan_table_cache_, Data_Namespace::CPU_LEVEL, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, getOneTableColumnFragment(), InputColDescriptor::getScanDesc(), InputDescriptor::getSourceType(), ColumnarResults::mergeResults(), TABLE, and transferColumnIfNeeded().

Referenced by Executor::fetchChunks(), and Executor::fetchUnionChunks().

275  {
276  const auto fragments_it = all_tables_fragments.find(table_id);
277  CHECK(fragments_it != all_tables_fragments.end());
278  const auto fragments = fragments_it->second;
279  const auto frag_count = fragments->size();
280  std::vector<std::unique_ptr<ColumnarResults>> column_frags;
281  const ColumnarResults* table_column = nullptr;
282  const InputColDescriptor col_desc(col_id, table_id, int(0));
283  CHECK(col_desc.getScanDesc().getSourceType() == InputSourceType::TABLE);
284  {
285  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
286  auto column_it = columnarized_scan_table_cache_.find(col_desc);
287  if (column_it == columnarized_scan_table_cache_.end()) {
288  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
291  }
292  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
293  std::list<ChunkIter> chunk_iter_holder;
294  const auto& fragment = (*fragments)[frag_id];
295  if (fragment.isEmptyPhysicalFragment()) {
296  continue;
297  }
298  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
299  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
300  auto col_buffer = getOneTableColumnFragment(table_id,
301  static_cast<int>(frag_id),
302  col_id,
303  all_tables_fragments,
304  chunk_holder,
305  chunk_iter_holder,
307  int(0),
308  device_allocator);
309  column_frags.push_back(
310  std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
311  col_buffer,
312  fragment.getNumTuples(),
313  chunk_meta_it->second->sqlType,
314  thread_idx));
315  }
316  auto merged_results =
317  ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
318  table_column = merged_results.get();
319  columnarized_scan_table_cache_.emplace(col_desc, std::move(merged_results));
320  } else {
321  table_column = column_it->second.get();
322  }
323  }
324  return ColumnFetcher::transferColumnIfNeeded(table_column,
325  0,
326  &executor_->getCatalog()->getDataMgr(),
327  memory_level,
328  device_id,
329  device_allocator);
330 }
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1120
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::mutex columnar_fetch_mutex_
Executor * executor_
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:115
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
__device__ bool check_interrupt()
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getChunkiter ( const InputColDescriptor  col_desc,
const int  device_id = 0 
) const
private

Definition at line 536 of file ColumnFetcher.cpp.

References InputColDescriptor::getColId(), InputColDescriptor::getScanDesc(), InputDescriptor::getTableId(), linearized_multi_frag_chunk_iter_cache_, and VLOG.

Referenced by linearizeColumnFragments().

537  {
538  auto linearized_chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
539  if (linearized_chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
540  auto dev_iter_map_it = linearized_chunk_iter_it->second.find(device_id);
541  if (dev_iter_map_it != linearized_chunk_iter_it->second.end()) {
542  VLOG(2) << "Recycle merged chunk_iter for col_desc (tbl: "
543  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
544  << "), device: " << device_id;
545  return dev_iter_map_it->second;
546  }
547  }
548  return nullptr;
549 }
int getColId() const
int getTableId() const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
const InputDescriptor & getScanDesc() const
#define VLOG(n)
Definition: Logger.h:297

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< const int8_t *, size_t > ColumnFetcher::getOneColumnFragment ( Executor executor,
const Analyzer::ColumnVar hash_col,
const Fragmenter_Namespace::FragmentInfo fragment,
const Data_Namespace::MemoryLevel  effective_mem_lvl,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner,
ColumnCacheMap column_cache 
)
static

Gets one chunk's pointer and element count on either CPU or GPU.

Gets a column fragment chunk on CPU or on GPU depending on the effective memory level parameter. For temporary tables, the chunk will be copied to the GPU if needed. Returns a buffer pointer and an element count.

Definition at line 50 of file ColumnFetcher.cpp.

References CHECK, anonymous_namespace{ColumnFetcher.cpp}::columnarize_result(), Data_Namespace::CPU_LEVEL, DEBUG_TIMER, Fragmenter_Namespace::FragmentInfo::fragmentId, get_column_descriptor_maybe(), Analyzer::ColumnVar::get_column_id(), Analyzer::ColumnVar::get_table_id(), get_temporary_table(), Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMap(), Fragmenter_Namespace::FragmentInfo::getNumTuples(), Fragmenter_Namespace::FragmentInfo::isEmptyPhysicalFragment(), Fragmenter_Namespace::FragmentInfo::physicalTableId, and transferColumnIfNeeded().

Referenced by RelAlgExecutor::createWindowFunctionContext(), TableFunctionExecutionContext::execute(), and makeJoinColumn().

59  {
60  static std::mutex columnar_conversion_mutex;
61  auto timer = DEBUG_TIMER(__func__);
62  if (fragment.isEmptyPhysicalFragment()) {
63  return {nullptr, 0};
64  }
65  const auto table_id = hash_col.get_table_id();
66  const auto& catalog = *executor->getCatalog();
67  const auto cd =
68  get_column_descriptor_maybe(hash_col.get_column_id(), table_id, catalog);
69  CHECK(!cd || !(cd->isVirtualCol));
70  const int8_t* col_buff = nullptr;
71  if (cd) { // real table
72  /* chunk_meta_it is used here to retrieve chunk numBytes and
73  numElements. Apparently, their values are often zeros. If we
74  knew how to predict the zero values, calling
75  getChunkMetadataMap could be avoided to skip
76  synthesize_metadata calls. */
77  auto chunk_meta_it = fragment.getChunkMetadataMap().find(hash_col.get_column_id());
78  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
79  ChunkKey chunk_key{catalog.getCurrentDB().dbId,
80  fragment.physicalTableId,
81  hash_col.get_column_id(),
82  fragment.fragmentId};
83  const auto chunk = Chunk_NS::Chunk::getChunk(
84  cd,
85  &catalog.getDataMgr(),
86  chunk_key,
87  effective_mem_lvl,
88  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
89  chunk_meta_it->second->numBytes,
90  chunk_meta_it->second->numElements);
91  chunks_owner.push_back(chunk);
92  CHECK(chunk);
93  auto ab = chunk->getBuffer();
94  CHECK(ab->getMemoryPtr());
95  col_buff = reinterpret_cast<int8_t*>(ab->getMemoryPtr());
96  } else { // temporary table
97  const ColumnarResults* col_frag{nullptr};
98  {
99  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
100  const auto frag_id = fragment.fragmentId;
101  if (column_cache.empty() || !column_cache.count(table_id)) {
102  column_cache.insert(std::make_pair(
103  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
104  }
105  auto& frag_id_to_result = column_cache[table_id];
106  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
107  frag_id_to_result.insert(
108  std::make_pair(frag_id,
109  std::shared_ptr<const ColumnarResults>(columnarize_result(
110  executor->row_set_mem_owner_,
111  get_temporary_table(executor->temporary_tables_, table_id),
112  thread_idx,
113  frag_id))));
114  }
115  col_frag = column_cache[table_id][frag_id].get();
116  }
117  col_buff = transferColumnIfNeeded(
118  col_frag,
119  hash_col.get_column_id(),
120  &catalog.getDataMgr(),
121  effective_mem_lvl,
122  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
123  device_allocator);
124  }
125  return {col_buff, fragment.getNumTuples()};
126 }
int get_table_id() const
Definition: Analyzer.h:194
std::vector< int > ChunkKey
Definition: types.h:37
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:229
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:221
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
const ChunkMetadataMap & getChunkMetadataMap() const
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const int frag_id)
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
#define CHECK(condition)
Definition: Logger.h:203
#define DEBUG_TIMER(name)
Definition: Logger.h:319
int get_column_id() const
Definition: Analyzer.h:195

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getOneTableColumnFragment ( const int  table_id,
const int  frag_id,
const int  col_id,
const std::map< int, const TableFragments * > &  all_tables_fragments,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator 
) const

Definition at line 191 of file ColumnFetcher.cpp.

References Allocator::alloc(), cat(), CHECK, CHECK_EQ, CHECK_GT, DeviceAllocator::copyToDevice(), Data_Namespace::CPU_LEVEL, executor_, get_column_descriptor(), get_column_type(), Chunk_NS::Chunk::getChunk(), Data_Namespace::GPU_LEVEL, and kENCODING_NONE.

Referenced by Executor::fetchChunks(), Executor::fetchUnionChunks(), and getAllTableColumnFragments().

200  {
201  static std::mutex varlen_chunk_mutex; // TODO(alex): remove
202  static std::mutex chunk_list_mutex;
203  const auto fragments_it = all_tables_fragments.find(table_id);
204  CHECK(fragments_it != all_tables_fragments.end());
205  const auto fragments = fragments_it->second;
206  const auto& fragment = (*fragments)[frag_id];
207  if (fragment.isEmptyPhysicalFragment()) {
208  return nullptr;
209  }
210  std::shared_ptr<Chunk_NS::Chunk> chunk;
211  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
212  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
213  CHECK(table_id > 0);
214  const auto& cat = *executor_->getCatalog();
215  auto cd = get_column_descriptor(col_id, table_id, cat);
216  CHECK(cd);
217  const auto col_type =
218  get_column_type(col_id, table_id, cd, executor_->temporary_tables_);
219  const bool is_real_string =
220  col_type.is_string() && col_type.get_compression() == kENCODING_NONE;
221  const bool is_varlen =
222  is_real_string ||
223  col_type.is_array(); // TODO: should it be col_type.is_varlen_array() ?
224  {
225  ChunkKey chunk_key{
226  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
227  std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
228  if (is_varlen) {
229  varlen_chunk_lock.reset(new std::lock_guard<std::mutex>(varlen_chunk_mutex));
230  }
232  cd,
233  &cat.getDataMgr(),
234  chunk_key,
235  memory_level,
236  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id,
237  chunk_meta_it->second->numBytes,
238  chunk_meta_it->second->numElements);
239  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex);
240  chunk_holder.push_back(chunk);
241  }
242  if (is_varlen) {
243  CHECK_GT(table_id, 0);
244  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
245  chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
246  auto& chunk_iter = chunk_iter_holder.back();
247  if (memory_level == Data_Namespace::CPU_LEVEL) {
248  return reinterpret_cast<int8_t*>(&chunk_iter);
249  } else {
250  auto ab = chunk->getBuffer();
251  ab->pin();
252  auto& row_set_mem_owner = executor_->getRowSetMemoryOwner();
253  row_set_mem_owner->addVarlenInputBuffer(ab);
254  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
255  CHECK(allocator);
256  auto chunk_iter_gpu = allocator->alloc(sizeof(ChunkIter));
257  allocator->copyToDevice(
258  chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter), sizeof(ChunkIter));
259  return chunk_iter_gpu;
260  }
261  } else {
262  auto ab = chunk->getBuffer();
263  CHECK(ab->getMemoryPtr());
264  return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
265  }
266 }
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::vector< int > ChunkKey
Definition: types.h:37
std::string cat(Ts &&...args)
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:237
Executor * executor_
#define CHECK_GT(x, y)
Definition: Logger.h:215
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
#define CHECK(condition)
Definition: Logger.h:203
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:192

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getResultSetColumn ( const InputColDescriptor col_desc,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const

Definition at line 332 of file ColumnFetcher.cpp.

References CHECK, executor_, get_temporary_table(), InputColDescriptor::getColId(), InputColDescriptor::getScanDesc(), and InputDescriptor::getTableId().

Referenced by Executor::fetchChunks(), and Executor::fetchUnionChunks().

337  {
338  CHECK(col_desc);
339  const auto table_id = col_desc->getScanDesc().getTableId();
340  return getResultSetColumn(get_temporary_table(executor_->temporary_tables_, table_id),
341  table_id,
342  col_desc->getColId(),
343  memory_level,
344  device_id,
345  device_allocator,
346  thread_idx);
347 }
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
Executor * executor_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:229
int getColId() const
int getTableId() const
#define CHECK(condition)
Definition: Logger.h:203
const InputDescriptor & getScanDesc() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getResultSetColumn ( const ResultSetPtr buffer,
const int  table_id,
const int  col_id,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const
private

Definition at line 565 of file ColumnFetcher.cpp.

References CHECK_GE, CHECK_NE, columnar_fetch_mutex_, anonymous_namespace{ColumnFetcher.cpp}::columnarize_result(), columnarized_table_cache_, executor_, run_benchmark_import::result, and transferColumnIfNeeded().

572  {
573  const ColumnarResults* result{nullptr};
574  {
575  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
576  if (columnarized_table_cache_.empty() || !columnarized_table_cache_.count(table_id)) {
577  columnarized_table_cache_.insert(std::make_pair(
578  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
579  }
580  auto& frag_id_to_result = columnarized_table_cache_[table_id];
581  int frag_id = 0;
582  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
583  frag_id_to_result.insert(std::make_pair(
584  frag_id,
585  std::shared_ptr<const ColumnarResults>(columnarize_result(
586  executor_->row_set_mem_owner_, buffer, thread_idx, frag_id))));
587  }
588  CHECK_NE(size_t(0), columnarized_table_cache_.count(table_id));
589  result = columnarized_table_cache_[table_id][frag_id].get();
590  }
591  CHECK_GE(col_id, 0);
592  return transferColumnIfNeeded(result,
593  col_id,
594  &executor_->getCatalog()->getDataMgr(),
595  memory_level,
596  device_id,
597  device_allocator);
598 }
std::mutex columnar_fetch_mutex_
#define CHECK_GE(x, y)
Definition: Logger.h:216
ColumnCacheMap columnarized_table_cache_
Executor * executor_
#define CHECK_NE(x, y)
Definition: Logger.h:212
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const int frag_id)
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)

+ Here is the call graph for this function:

const int8_t * ColumnFetcher::linearizeColumnFragments ( const int  table_id,
const int  col_id,
const std::map< int, const TableFragments * > &  all_tables_fragments,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const

Definition at line 349 of file ColumnFetcher.cpp.

References addMergedChunk(), Allocator::alloc(), cat(), CHECK, CHECK_EQ, CHECK_GT, check_interrupt(), columnar_fetch_mutex_, DeviceAllocator::copyToDevice(), Data_Namespace::CPU_LEVEL, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, get_column_descriptor(), Chunk_NS::Chunk::getChunk(), getChunkiter(), InputColDescriptor::getScanDesc(), InputDescriptor::getSourceType(), Data_Namespace::GPU_LEVEL, linearized_multi_frag_chunk_iter_cache_, prepareChunkIter(), and TABLE.

Referenced by Executor::fetchChunks().

358  {
359  // todo(yoonmin): True varlen col linearization
360  const auto fragments_it = all_tables_fragments.find(table_id);
361  CHECK(fragments_it != all_tables_fragments.end());
362  const auto fragments = fragments_it->second;
363  const auto frag_count = fragments->size();
364  const InputColDescriptor col_desc(col_id, table_id, int(0));
365  const auto& cat = *executor_->getCatalog();
366  auto cd = get_column_descriptor(col_id, table_id, cat);
367  CHECK(cd);
368  CHECK(col_desc.getScanDesc().getSourceType() == InputSourceType::TABLE);
369  CHECK_GT(table_id, 0);
370  size_t total_num_tuples = 0;
371  size_t total_data_buf_size = 0;
372  size_t total_idx_buf_size = 0;
373 
374  std::lock_guard<std::mutex> linearize_guard(columnar_fetch_mutex_);
375  auto linearized_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
376  if (linearized_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
377  if (memory_level == CPU_LEVEL) {
378  return getChunkiter(col_desc, 0);
379  } else {
380  // todo(yoonmin): D2D copy of merged chunk and its iter?
381  if (linearized_iter_it->second.find(device_id) !=
382  linearized_iter_it->second.end()) {
383  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
384  device_allocator->copyToDevice(
385  chunk_iter_gpu, getChunkiter(col_desc, device_id), sizeof(ChunkIter));
386  return chunk_iter_gpu;
387  }
388  }
389  }
390 
391  // collect target fragments
392  // in GPU execution, we first load chunks in CPU, and only merge them in GPU
393  std::shared_ptr<Chunk_NS::Chunk> chunk;
394  std::list<std::shared_ptr<Chunk_NS::Chunk>> local_chunk_holder;
395  std::list<ChunkIter> local_chunk_iter_holder;
396  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
397  const auto& fragment = (*fragments)[frag_id];
398  if (fragment.isEmptyPhysicalFragment()) {
399  continue;
400  }
401  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
402  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
403  ChunkKey chunk_key{
404  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
405  chunk = Chunk_NS::Chunk::getChunk(cd,
406  &cat.getDataMgr(),
407  chunk_key,
409  0,
410  chunk_meta_it->second->numBytes,
411  chunk_meta_it->second->numElements);
412  local_chunk_holder.push_back(chunk);
413  local_chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
414  total_num_tuples += fragment.getNumTuples();
415  total_data_buf_size += chunk->getBuffer()->size();
416  if (chunk->getIndexBuf()) {
417  total_idx_buf_size += chunk->getIndexBuf()->size();
418  }
419  }
420  // linearize collected fragments
421  // todo(yoonmin): parallelize this step
422  auto merged_chunk_buffer =
423  cat.getDataMgr().alloc(memory_level,
424  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id,
425  total_data_buf_size);
426  size_t sum_chunk_sizes = 0;
427  for (auto chunk_holder_it = local_chunk_holder.begin();
428  chunk_holder_it != local_chunk_holder.end();
429  chunk_holder_it++) {
431  cat.getDataMgr().free(merged_chunk_buffer);
433  }
434  auto target_chunk = chunk_holder_it->get();
435  auto target_chunk_buffer = target_chunk->getBuffer();
436  merged_chunk_buffer->append(
437  target_chunk_buffer->getMemoryPtr(),
438  target_chunk_buffer->size(),
440  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id);
441  sum_chunk_sizes += target_chunk_buffer->size();
442  }
443  // check whether each chunk's data buffer is clean under chunk merging
444  CHECK_EQ(total_data_buf_size, sum_chunk_sizes);
445 
446  // make ChunkIter for the linearized chunk
447  // todo(yoonmin): cache for merged chunk?
448  auto merged_chunk = std::make_shared<Chunk_NS::Chunk>(merged_chunk_buffer, nullptr, cd);
449  auto merged_chunk_iter = prepareChunkIter(
450  merged_chunk_buffer, *(chunk_iter_holder.begin()), total_num_tuples);
451  chunk_holder.push_back(merged_chunk);
452  chunk_iter_holder.push_back(merged_chunk_iter);
453  auto merged_chunk_iter_ptr = reinterpret_cast<int8_t*>(&(chunk_iter_holder.back()));
454  if (memory_level == MemoryLevel::CPU_LEVEL) {
455  addMergedChunk(col_desc, 0, merged_chunk, merged_chunk_iter_ptr);
456  return merged_chunk_iter_ptr;
457  } else {
458  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
459  CHECK(device_allocator);
460  auto ab = merged_chunk->getBuffer();
461  ab->pin();
462  addMergedChunk(col_desc, device_id, merged_chunk, merged_chunk_iter_ptr);
463  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
464  device_allocator->copyToDevice(
465  chunk_iter_gpu, merged_chunk_iter_ptr, sizeof(ChunkIter));
466  return chunk_iter_gpu;
467  }
468 }
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::vector< int > ChunkKey
Definition: types.h:37
std::string cat(Ts &&...args)
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1120
std::mutex columnar_fetch_mutex_
virtual void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const =0
virtual int8_t * alloc(const size_t num_bytes)=0
const int8_t * getChunkiter(const InputColDescriptor col_desc, const int device_id=0) const
Executor * executor_
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:115
#define CHECK_GT(x, y)
Definition: Logger.h:215
void addMergedChunk(const InputColDescriptor col_desc, const int device_id, std::shared_ptr< Chunk_NS::Chunk > chunk_ptr, int8_t *chunk_iter_ptr) const
__device__ bool check_interrupt()
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
#define CHECK(condition)
Definition: Logger.h:203
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
ChunkIter prepareChunkIter(AbstractBuffer *merged, ChunkIter &chunk_iter, const size_t total_num_tuples) const
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:192

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

JoinColumn ColumnFetcher::makeJoinColumn ( Executor executor,
const Analyzer::ColumnVar hash_col,
const std::vector< Fragmenter_Namespace::FragmentInfo > &  fragments,
const Data_Namespace::MemoryLevel  effective_mem_lvl,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner,
std::vector< std::shared_ptr< void >> &  malloc_owner,
ColumnCacheMap column_cache 
)
static

Creates a JoinColumn struct containing an array of JoinChunk structs.

makeJoinColumn() creates a JoinColumn struct containing a array of JoinChunk structs, col_chunks_buff, malloced in CPU memory. Although the col_chunks_buff array is in CPU memory here, each JoinChunk struct contains an int8_t* pointer from getOneColumnFragment(), col_buff, that can point to either CPU memory or GPU memory depending on the effective_mem_lvl parameter. See also the fetchJoinColumn() function where col_chunks_buff is copied into GPU memory if needed. The malloc_owner parameter will have the malloced array appended. The chunks_owner parameter will be appended with the chunks.

Definition at line 137 of file ColumnFetcher.cpp.

References CHECK, CHECK_GT, check_interrupt(), checked_malloc(), JoinChunk::col_buff, Data_Namespace::CPU_LEVEL, Executor::ERR_INTERRUPTED, g_enable_non_kernel_time_query_interrupt, SQLTypeInfo::get_size(), Analyzer::Expr::get_type_info(), getOneColumnFragment(), and JoinChunk::num_elems.

Referenced by HashJoin::fetchJoinColumn().

147  {
148  CHECK(!fragments.empty());
149 
150  size_t col_chunks_buff_sz = sizeof(struct JoinChunk) * fragments.size();
151  // TODO: needs an allocator owner
152  auto col_chunks_buff = reinterpret_cast<int8_t*>(
153  malloc_owner.emplace_back(checked_malloc(col_chunks_buff_sz), free).get());
154  auto join_chunk_array = reinterpret_cast<struct JoinChunk*>(col_chunks_buff);
155 
156  size_t num_elems = 0;
157  size_t num_chunks = 0;
158  for (auto& frag : fragments) {
161  }
162  auto [col_buff, elem_count] = getOneColumnFragment(
163  executor,
164  hash_col,
165  frag,
166  effective_mem_lvl,
167  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
168  device_allocator,
169  thread_idx,
170  chunks_owner,
171  column_cache);
172  if (col_buff != nullptr) {
173  num_elems += elem_count;
174  join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
175  } else {
176  continue;
177  }
178  ++num_chunks;
179  }
180 
181  int elem_sz = hash_col.get_type_info().get_size();
182  CHECK_GT(elem_sz, 0);
183 
184  return {col_chunks_buff,
185  col_chunks_buff_sz,
186  num_chunks,
187  num_elems,
188  static_cast<size_t>(elem_sz)};
189 }
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1120
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:115
const int8_t * col_buff
#define CHECK_GT(x, y)
Definition: Logger.h:215
__device__ bool check_interrupt()
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
#define CHECK(condition)
Definition: Logger.h:203
size_t num_elems

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ChunkIter ColumnFetcher::prepareChunkIter ( AbstractBuffer merged,
ChunkIter chunk_iter,
const size_t  total_num_tuples 
) const
private

Definition at line 551 of file ColumnFetcher.cpp.

References ChunkIter::current_pos, ChunkIter::end_pos, Data_Namespace::AbstractBuffer::getMemoryPtr(), ChunkIter::num_elems, Data_Namespace::AbstractBuffer::size(), ChunkIter::skip, ChunkIter::skip_size, ChunkIter::start_pos, and ChunkIter::type_info.

Referenced by linearizeColumnFragments().

553  {
554  ChunkIter merged_chunk_iter;
555  merged_chunk_iter.start_pos = merged->getMemoryPtr();
556  merged_chunk_iter.current_pos = merged_chunk_iter.start_pos;
557  merged_chunk_iter.end_pos = merged->getMemoryPtr() + merged->size();
558  merged_chunk_iter.num_elems = total_num_tuples;
559  merged_chunk_iter.skip = chunk_iter.skip;
560  merged_chunk_iter.skip_size = chunk_iter.skip_size;
561  merged_chunk_iter.type_info = chunk_iter.type_info;
562  return merged_chunk_iter;
563 }
int8_t * start_pos
Definition: ChunkIter.h:33
int8_t * current_pos
Definition: ChunkIter.h:32
SQLTypeInfo type_info
Definition: ChunkIter.h:30
virtual int8_t * getMemoryPtr()=0
int8_t * end_pos
Definition: ChunkIter.h:34
size_t num_elems
Definition: ChunkIter.h:37
int skip_size
Definition: ChunkIter.h:36
int skip
Definition: ChunkIter.h:35

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::transferColumnIfNeeded ( const ColumnarResults columnar_results,
const int  col_id,
Data_Namespace::DataMgr data_mgr,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator 
)
staticprivate

Definition at line 470 of file ColumnFetcher.cpp.

References Allocator::alloc(), CHECK, CHECK_LT, DeviceAllocator::copyToDevice(), ColumnarResults::getColumnBuffers(), ColumnarResults::getColumnType(), Data_Namespace::GPU_LEVEL, and ColumnarResults::size().

Referenced by getAllTableColumnFragments(), getOneColumnFragment(), and getResultSetColumn().

476  {
477  if (!columnar_results) {
478  return nullptr;
479  }
480  const auto& col_buffers = columnar_results->getColumnBuffers();
481  CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
482  if (memory_level == Data_Namespace::GPU_LEVEL) {
483  const auto& col_ti = columnar_results->getColumnType(col_id);
484  const auto num_bytes = columnar_results->size() * col_ti.get_size();
485  CHECK(device_allocator);
486  auto gpu_col_buffer = device_allocator->alloc(num_bytes);
487  device_allocator->copyToDevice(gpu_col_buffer, col_buffers[col_id], num_bytes);
488  return gpu_col_buffer;
489  }
490  return col_buffers[col_id];
491 }
virtual void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const =0
virtual int8_t * alloc(const size_t num_bytes)=0
const size_t size() const
#define CHECK_LT(x, y)
Definition: Logger.h:213
#define CHECK(condition)
Definition: Logger.h:203
const std::vector< int8_t * > & getColumnBuffers() const
const SQLTypeInfo & getColumnType(const int col_id) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Friends And Related Function Documentation

friend class QueryCompilationDescriptor
friend

Definition at line 159 of file ColumnFetcher.h.

friend class TableFunctionExecutionContext
friend

Definition at line 160 of file ColumnFetcher.h.

Member Data Documentation

std::mutex ColumnFetcher::columnar_fetch_mutex_
mutableprivate
std::unordered_map< InputColDescriptor, std::unordered_map<CacheKey, std::unique_ptr<const ColumnarResults> > > ColumnFetcher::columnarized_ref_table_cache_
mutableprivate

Definition at line 149 of file ColumnFetcher.h.

std::unordered_map<InputColDescriptor, std::unique_ptr<const ColumnarResults> > ColumnFetcher::columnarized_scan_table_cache_
mutableprivate

Definition at line 151 of file ColumnFetcher.h.

Referenced by getAllTableColumnFragments().

ColumnCacheMap ColumnFetcher::columnarized_table_cache_
mutableprivate
Executor* ColumnFetcher::executor_
private
std::unordered_map<InputColDescriptor, DeviceMergedChunkIterMap> ColumnFetcher::linearized_multi_frag_chunk_iter_cache_
mutableprivate

Definition at line 157 of file ColumnFetcher.h.

Referenced by addMergedChunk(), getChunkiter(), and linearizeColumnFragments().

std::unordered_map<InputColDescriptor, DeviceMergedChunkMap> ColumnFetcher::linearized_multi_frag_table_cache_
mutableprivate

Definition at line 154 of file ColumnFetcher.h.

Referenced by addMergedChunk().


The documentation for this class was generated from the following files: