OmniSciDB  21ac014ffc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnFetcher Class Reference

#include <ColumnFetcher.h>

+ Collaboration diagram for ColumnFetcher:

Public Member Functions

 ColumnFetcher (Executor *executor, const ColumnCacheMap &column_cache)
 
const int8_t * getOneTableColumnFragment (const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
 
const int8_t * getAllTableColumnFragments (const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
const int8_t * getResultSetColumn (const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
const int8_t * linearizeColumnFragments (const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
void freeTemporaryCpuLinearizedIdxBuf ()
 
void freeLinearizedBuf ()
 

Static Public Member Functions

static std::pair< const int8_t
*, size_t > 
getOneColumnFragment (Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
 Gets one chunk's pointer and element count on either CPU or GPU. More...
 
static JoinColumn makeJoinColumn (Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
 Creates a JoinColumn struct containing an array of JoinChunk structs. More...
 

Private Types

using DeviceMergedChunkIterMap = std::unordered_map< int, int8_t * >
 
using DeviceMergedChunkMap = std::unordered_map< int, AbstractBuffer * >
 

Private Member Functions

MergedChunk linearizeVarLenArrayColFrags (const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
MergedChunk linearizeFixedLenArrayColFrags (const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
void addMergedChunkIter (const InputColDescriptor col_desc, const int device_id, int8_t *chunk_iter_ptr) const
 
const int8_t * getChunkiter (const InputColDescriptor col_desc, const int device_id=0) const
 
ChunkIter prepareChunkIter (AbstractBuffer *merged_data_buf, AbstractBuffer *merged_index_buf, ChunkIter &chunk_iter, bool is_true_varlen_type, const size_t total_num_tuples) const
 
const int8_t * getResultSetColumn (const ResultSetPtr &buffer, const int table_id, const int col_id, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 

Static Private Member Functions

static const int8_t * transferColumnIfNeeded (const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
 

Private Attributes

Executorexecutor_
 
std::mutex columnar_fetch_mutex_
 
std::mutex varlen_chunk_fetch_mutex_
 
std::mutex linearization_mutex_
 
std::mutex chunk_list_mutex_
 
std::mutex linearized_col_cache_mutex_
 
ColumnCacheMap columnarized_table_cache_
 
std::unordered_map
< InputColDescriptor,
std::unique_ptr< const
ColumnarResults > > 
columnarized_scan_table_cache_
 
std::unordered_map
< InputColDescriptor,
DeviceMergedChunkIterMap
linearized_multi_frag_chunk_iter_cache_
 
std::unordered_map< int,
AbstractBuffer * > 
linearlized_temporary_cpu_index_buf_cache_
 
std::unordered_map
< InputColDescriptor,
DeviceMergedChunkMap
linearized_data_buf_cache_
 
std::unordered_map
< InputColDescriptor,
DeviceMergedChunkMap
linearized_idx_buf_cache_
 

Friends

class QueryCompilationDescriptor
 
class TableFunctionExecutionContext
 

Detailed Description

Definition at line 49 of file ColumnFetcher.h.

Member Typedef Documentation

using ColumnFetcher::DeviceMergedChunkIterMap = std::unordered_map<int, int8_t*>
private

Definition at line 189 of file ColumnFetcher.h.

using ColumnFetcher::DeviceMergedChunkMap = std::unordered_map<int, AbstractBuffer*>
private

Definition at line 190 of file ColumnFetcher.h.

Constructor & Destructor Documentation

ColumnFetcher::ColumnFetcher ( Executor executor,
const ColumnCacheMap column_cache 
)

Definition at line 60 of file ColumnFetcher.cpp.

61  : executor_(executor), columnarized_table_cache_(column_cache) {}
ColumnCacheMap columnarized_table_cache_
Executor * executor_

Member Function Documentation

void ColumnFetcher::addMergedChunkIter ( const InputColDescriptor  col_desc,
const int  device_id,
int8_t *  chunk_iter_ptr 
) const
private

Definition at line 919 of file ColumnFetcher.cpp.

References InputColDescriptor::getColId(), InputColDescriptor::getScanDesc(), InputDescriptor::getTableId(), linearized_col_cache_mutex_, linearized_multi_frag_chunk_iter_cache_, and VLOG.

Referenced by linearizeColumnFragments().

921  {
922  std::lock_guard<std::mutex> linearize_guard(linearized_col_cache_mutex_);
923  auto chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
924  if (chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
925  auto iter_device_it = chunk_iter_it->second.find(device_id);
926  if (iter_device_it == chunk_iter_it->second.end()) {
927  VLOG(2) << "Additional merged chunk_iter for col_desc (tbl: "
928  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
929  << "), device_id: " << device_id;
930  chunk_iter_it->second.emplace(device_id, chunk_iter_ptr);
931  }
932  } else {
934  iter_m.emplace(device_id, chunk_iter_ptr);
935  VLOG(2) << "New merged chunk_iter for col_desc (tbl: "
936  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
937  << "), device_id: " << device_id;
938  linearized_multi_frag_chunk_iter_cache_.emplace(col_desc, iter_m);
939  }
940 }
std::unordered_map< int, int8_t * > DeviceMergedChunkIterMap
int getColId() const
int getTableId() const
std::mutex linearized_col_cache_mutex_
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
const InputDescriptor & getScanDesc() const
#define VLOG(n)
Definition: Logger.h:300

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnFetcher::freeLinearizedBuf ( )

Definition at line 982 of file ColumnFetcher.cpp.

References cat(), executor_, linearized_col_cache_mutex_, linearized_data_buf_cache_, and linearized_idx_buf_cache_.

Referenced by Executor::executeWorkUnitImpl().

982  {
983  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
984  const auto& cat = *executor_->getCatalog();
985  auto& data_mgr = cat.getDataMgr();
986 
987  if (!linearized_data_buf_cache_.empty()) {
988  for (auto& kv : linearized_data_buf_cache_) {
989  for (auto& kv2 : kv.second) {
990  data_mgr.free(kv2.second);
991  }
992  }
993  }
994 
995  if (!linearized_idx_buf_cache_.empty()) {
996  for (auto& kv : linearized_idx_buf_cache_) {
997  for (auto& kv2 : kv.second) {
998  data_mgr.free(kv2.second);
999  }
1000  }
1001  }
1002 }
std::string cat(Ts &&...args)
Executor * executor_
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
std::mutex linearized_col_cache_mutex_
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_idx_buf_cache_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnFetcher::freeTemporaryCpuLinearizedIdxBuf ( )

Definition at line 1004 of file ColumnFetcher.cpp.

References cat(), executor_, linearized_col_cache_mutex_, and linearlized_temporary_cpu_index_buf_cache_.

Referenced by Executor::executeWorkUnitImpl().

1004  {
1005  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
1006  const auto& cat = *executor_->getCatalog();
1007  auto& data_mgr = cat.getDataMgr();
1010  data_mgr.free(kv.second);
1011  }
1012  }
1013 }
std::string cat(Ts &&...args)
Executor * executor_
std::unordered_map< int, AbstractBuffer * > linearlized_temporary_cpu_index_buf_cache_
std::mutex linearized_col_cache_mutex_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getAllTableColumnFragments ( const int  table_id,
const int  col_id,
const std::map< int, const TableFragments * > &  all_tables_fragments,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const

Definition at line 282 of file ColumnFetcher.cpp.

References CHECK, check_interrupt(), columnar_fetch_mutex_, columnarized_scan_table_cache_, Data_Namespace::CPU_LEVEL, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, getOneTableColumnFragment(), InputColDescriptor::getScanDesc(), InputDescriptor::getSourceType(), ColumnarResults::mergeResults(), TABLE, and transferColumnIfNeeded().

Referenced by Executor::fetchChunks(), and Executor::fetchUnionChunks().

289  {
290  const auto fragments_it = all_tables_fragments.find(table_id);
291  CHECK(fragments_it != all_tables_fragments.end());
292  const auto fragments = fragments_it->second;
293  const auto frag_count = fragments->size();
294  std::vector<std::unique_ptr<ColumnarResults>> column_frags;
295  const ColumnarResults* table_column = nullptr;
296  const InputColDescriptor col_desc(col_id, table_id, int(0));
297  CHECK(col_desc.getScanDesc().getSourceType() == InputSourceType::TABLE);
298  {
299  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
300  auto column_it = columnarized_scan_table_cache_.find(col_desc);
301  if (column_it == columnarized_scan_table_cache_.end()) {
302  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
305  }
306  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
307  std::list<ChunkIter> chunk_iter_holder;
308  const auto& fragment = (*fragments)[frag_id];
309  if (fragment.isEmptyPhysicalFragment()) {
310  continue;
311  }
312  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
313  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
314  auto col_buffer = getOneTableColumnFragment(table_id,
315  static_cast<int>(frag_id),
316  col_id,
317  all_tables_fragments,
318  chunk_holder,
319  chunk_iter_holder,
321  int(0),
322  device_allocator);
323  column_frags.push_back(
324  std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
325  col_buffer,
326  fragment.getNumTuples(),
327  chunk_meta_it->second->sqlType,
328  thread_idx));
329  }
330  auto merged_results =
331  ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
332  table_column = merged_results.get();
333  columnarized_scan_table_cache_.emplace(col_desc, std::move(merged_results));
334  } else {
335  table_column = column_it->second.get();
336  }
337  }
338  return ColumnFetcher::transferColumnIfNeeded(table_column,
339  0,
340  executor_->getDataMgr(),
341  memory_level,
342  device_id,
343  device_allocator);
344 }
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1142
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::mutex columnar_fetch_mutex_
Executor * executor_
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:115
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
__device__ bool check_interrupt()
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
#define CHECK(condition)
Definition: Logger.h:206

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getChunkiter ( const InputColDescriptor  col_desc,
const int  device_id = 0 
) const
private

Definition at line 942 of file ColumnFetcher.cpp.

References InputColDescriptor::getColId(), InputColDescriptor::getScanDesc(), InputDescriptor::getTableId(), linearized_multi_frag_chunk_iter_cache_, and VLOG.

Referenced by linearizeColumnFragments().

943  {
944  auto linearized_chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
945  if (linearized_chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
946  auto dev_iter_map_it = linearized_chunk_iter_it->second.find(device_id);
947  if (dev_iter_map_it != linearized_chunk_iter_it->second.end()) {
948  VLOG(2) << "Recycle merged chunk_iter for col_desc (tbl: "
949  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
950  << "), device_id: " << device_id;
951  return dev_iter_map_it->second;
952  }
953  }
954  return nullptr;
955 }
int getColId() const
int getTableId() const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
const InputDescriptor & getScanDesc() const
#define VLOG(n)
Definition: Logger.h:300

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< const int8_t *, size_t > ColumnFetcher::getOneColumnFragment ( Executor executor,
const Analyzer::ColumnVar hash_col,
const Fragmenter_Namespace::FragmentInfo fragment,
const Data_Namespace::MemoryLevel  effective_mem_lvl,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner,
ColumnCacheMap column_cache 
)
static

Gets one chunk's pointer and element count on either CPU or GPU.

Gets a column fragment chunk on CPU or on GPU depending on the effective memory level parameter. For temporary tables, the chunk will be copied to the GPU if needed. Returns a buffer pointer and an element count.

Definition at line 66 of file ColumnFetcher.cpp.

References CHECK, anonymous_namespace{ColumnFetcher.cpp}::columnarize_result(), Data_Namespace::CPU_LEVEL, DEBUG_TIMER, Fragmenter_Namespace::FragmentInfo::fragmentId, get_column_descriptor_maybe(), Analyzer::ColumnVar::get_column_id(), Analyzer::ColumnVar::get_table_id(), get_temporary_table(), Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMap(), Fragmenter_Namespace::FragmentInfo::getNumTuples(), Fragmenter_Namespace::FragmentInfo::isEmptyPhysicalFragment(), Fragmenter_Namespace::FragmentInfo::physicalTableId, and transferColumnIfNeeded().

Referenced by RelAlgExecutor::createWindowFunctionContext(), TableFunctionExecutionContext::execute(), and makeJoinColumn().

75  {
76  static std::mutex columnar_conversion_mutex;
77  auto timer = DEBUG_TIMER(__func__);
78  if (fragment.isEmptyPhysicalFragment()) {
79  return {nullptr, 0};
80  }
81  const auto table_id = hash_col.get_table_id();
82  const auto& catalog = *executor->getCatalog();
83  const auto cd =
84  get_column_descriptor_maybe(hash_col.get_column_id(), table_id, catalog);
85  CHECK(!cd || !(cd->isVirtualCol));
86  const int8_t* col_buff = nullptr;
87  if (cd) { // real table
88  /* chunk_meta_it is used here to retrieve chunk numBytes and
89  numElements. Apparently, their values are often zeros. If we
90  knew how to predict the zero values, calling
91  getChunkMetadataMap could be avoided to skip
92  synthesize_metadata calls. */
93  auto chunk_meta_it = fragment.getChunkMetadataMap().find(hash_col.get_column_id());
94  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
95  ChunkKey chunk_key{catalog.getCurrentDB().dbId,
96  fragment.physicalTableId,
97  hash_col.get_column_id(),
98  fragment.fragmentId};
99  const auto chunk = Chunk_NS::Chunk::getChunk(
100  cd,
101  &catalog.getDataMgr(),
102  chunk_key,
103  effective_mem_lvl,
104  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
105  chunk_meta_it->second->numBytes,
106  chunk_meta_it->second->numElements);
107  chunks_owner.push_back(chunk);
108  CHECK(chunk);
109  auto ab = chunk->getBuffer();
110  CHECK(ab->getMemoryPtr());
111  col_buff = reinterpret_cast<int8_t*>(ab->getMemoryPtr());
112  } else { // temporary table
113  const ColumnarResults* col_frag{nullptr};
114  {
115  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
116  const auto frag_id = fragment.fragmentId;
117  if (column_cache.empty() || !column_cache.count(table_id)) {
118  column_cache.insert(std::make_pair(
119  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
120  }
121  auto& frag_id_to_result = column_cache[table_id];
122  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
123  frag_id_to_result.insert(
124  std::make_pair(frag_id,
125  std::shared_ptr<const ColumnarResults>(columnarize_result(
126  executor->row_set_mem_owner_,
127  get_temporary_table(executor->temporary_tables_, table_id),
128  thread_idx,
129  frag_id))));
130  }
131  col_frag = column_cache[table_id][frag_id].get();
132  }
133  col_buff = transferColumnIfNeeded(
134  col_frag,
135  hash_col.get_column_id(),
136  &catalog.getDataMgr(),
137  effective_mem_lvl,
138  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
139  device_allocator);
140  }
141  return {col_buff, fragment.getNumTuples()};
142 }
int get_table_id() const
Definition: Analyzer.h:194
std::vector< int > ChunkKey
Definition: types.h:37
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:228
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:220
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
const ChunkMetadataMap & getChunkMetadataMap() const
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const int frag_id)
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
#define CHECK(condition)
Definition: Logger.h:206
#define DEBUG_TIMER(name)
Definition: Logger.h:322
int get_column_id() const
Definition: Analyzer.h:195

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getOneTableColumnFragment ( const int  table_id,
const int  frag_id,
const int  col_id,
const std::map< int, const TableFragments * > &  all_tables_fragments,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator 
) const

Definition at line 207 of file ColumnFetcher.cpp.

References Allocator::alloc(), cat(), CHECK, CHECK_EQ, CHECK_GT, chunk_list_mutex_, DeviceAllocator::copyToDevice(), Data_Namespace::CPU_LEVEL, executor_, get_column_descriptor(), get_column_type(), Chunk_NS::Chunk::getChunk(), Data_Namespace::GPU_LEVEL, kENCODING_NONE, and varlen_chunk_fetch_mutex_.

Referenced by Executor::fetchChunks(), Executor::fetchUnionChunks(), and getAllTableColumnFragments().

216  {
217  const auto fragments_it = all_tables_fragments.find(table_id);
218  CHECK(fragments_it != all_tables_fragments.end());
219  const auto fragments = fragments_it->second;
220  const auto& fragment = (*fragments)[frag_id];
221  if (fragment.isEmptyPhysicalFragment()) {
222  return nullptr;
223  }
224  std::shared_ptr<Chunk_NS::Chunk> chunk;
225  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
226  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
227  CHECK(table_id > 0);
228  const auto& cat = *executor_->getCatalog();
229  auto cd = get_column_descriptor(col_id, table_id, cat);
230  CHECK(cd);
231  const auto col_type =
232  get_column_type(col_id, table_id, cd, executor_->temporary_tables_);
233  const bool is_real_string =
234  col_type.is_string() && col_type.get_compression() == kENCODING_NONE;
235  const bool is_varlen =
236  is_real_string ||
237  col_type.is_array(); // TODO: should it be col_type.is_varlen_array() ?
238  {
239  ChunkKey chunk_key{
240  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
241  std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
242  if (is_varlen) {
243  varlen_chunk_lock.reset(new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
244  }
246  cd,
247  &cat.getDataMgr(),
248  chunk_key,
249  memory_level,
250  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id,
251  chunk_meta_it->second->numBytes,
252  chunk_meta_it->second->numElements);
253  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
254  chunk_holder.push_back(chunk);
255  }
256  if (is_varlen) {
257  CHECK_GT(table_id, 0);
258  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
259  chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
260  auto& chunk_iter = chunk_iter_holder.back();
261  if (memory_level == Data_Namespace::CPU_LEVEL) {
262  return reinterpret_cast<int8_t*>(&chunk_iter);
263  } else {
264  auto ab = chunk->getBuffer();
265  ab->pin();
266  auto& row_set_mem_owner = executor_->getRowSetMemoryOwner();
267  row_set_mem_owner->addVarlenInputBuffer(ab);
268  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
269  CHECK(allocator);
270  auto chunk_iter_gpu = allocator->alloc(sizeof(ChunkIter));
271  allocator->copyToDevice(
272  chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter), sizeof(ChunkIter));
273  return chunk_iter_gpu;
274  }
275  } else {
276  auto ab = chunk->getBuffer();
277  CHECK(ab->getMemoryPtr());
278  return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
279  }
280 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
std::vector< int > ChunkKey
Definition: types.h:37
std::string cat(Ts &&...args)
std::mutex varlen_chunk_fetch_mutex_
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:236
Executor * executor_
#define CHECK_GT(x, y)
Definition: Logger.h:218
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
std::mutex chunk_list_mutex_
#define CHECK(condition)
Definition: Logger.h:206
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:191

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getResultSetColumn ( const InputColDescriptor col_desc,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const

Definition at line 346 of file ColumnFetcher.cpp.

References CHECK, executor_, get_temporary_table(), InputColDescriptor::getColId(), InputColDescriptor::getScanDesc(), and InputDescriptor::getTableId().

Referenced by Executor::fetchChunks(), and Executor::fetchUnionChunks().

351  {
352  CHECK(col_desc);
353  const auto table_id = col_desc->getScanDesc().getTableId();
354  return getResultSetColumn(get_temporary_table(executor_->temporary_tables_, table_id),
355  table_id,
356  col_desc->getColId(),
357  memory_level,
358  device_id,
359  device_allocator,
360  thread_idx);
361 }
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
Executor * executor_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:228
int getColId() const
int getTableId() const
#define CHECK(condition)
Definition: Logger.h:206
const InputDescriptor & getScanDesc() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getResultSetColumn ( const ResultSetPtr buffer,
const int  table_id,
const int  col_id,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const
private

Definition at line 1015 of file ColumnFetcher.cpp.

References CHECK_GE, CHECK_NE, columnar_fetch_mutex_, anonymous_namespace{ColumnFetcher.cpp}::columnarize_result(), columnarized_table_cache_, executor_, run_benchmark_import::result, and transferColumnIfNeeded().

1022  {
1023  const ColumnarResults* result{nullptr};
1024  {
1025  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
1026  if (columnarized_table_cache_.empty() || !columnarized_table_cache_.count(table_id)) {
1027  columnarized_table_cache_.insert(std::make_pair(
1028  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
1029  }
1030  auto& frag_id_to_result = columnarized_table_cache_[table_id];
1031  int frag_id = 0;
1032  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
1033  frag_id_to_result.insert(std::make_pair(
1034  frag_id,
1035  std::shared_ptr<const ColumnarResults>(columnarize_result(
1036  executor_->row_set_mem_owner_, buffer, thread_idx, frag_id))));
1037  }
1038  CHECK_NE(size_t(0), columnarized_table_cache_.count(table_id));
1039  result = columnarized_table_cache_[table_id][frag_id].get();
1040  }
1041  CHECK_GE(col_id, 0);
1042  return transferColumnIfNeeded(
1043  result, col_id, executor_->getDataMgr(), memory_level, device_id, device_allocator);
1044 }
std::mutex columnar_fetch_mutex_
#define CHECK_GE(x, y)
Definition: Logger.h:219
ColumnCacheMap columnarized_table_cache_
Executor * executor_
#define CHECK_NE(x, y)
Definition: Logger.h:215
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const int frag_id)
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)

+ Here is the call graph for this function:

const int8_t * ColumnFetcher::linearizeColumnFragments ( const int  table_id,
const int  col_id,
const std::map< int, const TableFragments * > &  all_tables_fragments,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const

Definition at line 363 of file ColumnFetcher.cpp.

References addMergedChunkIter(), Allocator::alloc(), cat(), CHECK, CHECK_EQ, CHECK_GT, chunk_list_mutex_, DeviceAllocator::copyToDevice(), Data_Namespace::CPU_LEVEL, DEBUG_TIMER, executor_, get_column_descriptor(), Chunk_NS::Chunk::getChunk(), getChunkiter(), anonymous_namespace{ColumnFetcher.cpp}::getMemoryLevelString(), InputColDescriptor::getScanDesc(), InputDescriptor::getSourceType(), Data_Namespace::GPU_LEVEL, linearization_mutex_, linearized_col_cache_mutex_, linearized_multi_frag_chunk_iter_cache_, linearizeFixedLenArrayColFrags(), linearizeVarLenArrayColFrags(), prepareChunkIter(), run_benchmark_import::res, TABLE, varlen_chunk_fetch_mutex_, and VLOG.

Referenced by Executor::fetchChunks().

372  {
373  auto timer = DEBUG_TIMER(__func__);
374  const auto fragments_it = all_tables_fragments.find(table_id);
375  CHECK(fragments_it != all_tables_fragments.end());
376  const auto fragments = fragments_it->second;
377  const auto frag_count = fragments->size();
378  const InputColDescriptor col_desc(col_id, table_id, int(0));
379  const auto& cat = *executor_->getCatalog();
380  auto cd = get_column_descriptor(col_id, table_id, cat);
381  CHECK(cd);
382  CHECK(col_desc.getScanDesc().getSourceType() == InputSourceType::TABLE);
383  CHECK_GT(table_id, 0);
384  bool is_varlen_chunk = cd->columnType.is_varlen() && !cd->columnType.is_fixlen_array();
385  size_t total_num_tuples = 0;
386  size_t total_data_buf_size = 0;
387  size_t total_idx_buf_size = 0;
388  {
389  std::lock_guard<std::mutex> linearize_guard(linearized_col_cache_mutex_);
390  auto linearized_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
391  if (linearized_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
392  if (memory_level == CPU_LEVEL) {
393  // in CPU execution, each kernel can share merged chunk since they operates in the
394  // same memory space, so we can share the same chunk iter among kernels
395  return getChunkiter(col_desc, 0);
396  } else {
397  // in GPU execution, this becomes the matter when we deploy multi-GPUs
398  // so we only share the chunk_iter iff kernels are launched on the same GPU device
399  // otherwise we need to separately load merged chunk and its iter
400  // todo(yoonmin): D2D copy of merged chunk and its iter?
401  if (linearized_iter_it->second.find(device_id) !=
402  linearized_iter_it->second.end()) {
403  // note that cached chunk_iter is located on CPU memory space...
404  // we just need to copy it to each device
405  // chunk_iter already contains correct buffer addr depending on execution device
406  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
407  device_allocator->copyToDevice(
408  chunk_iter_gpu, getChunkiter(col_desc, device_id), sizeof(ChunkIter));
409  return chunk_iter_gpu;
410  }
411  }
412  }
413  }
414 
415  // collect target fragments
416  // basically we load chunk in CPU first, and do necessary manipulation
417  // to make semantics of a merged chunk correctly
418  std::shared_ptr<Chunk_NS::Chunk> chunk;
419  std::list<std::shared_ptr<Chunk_NS::Chunk>> local_chunk_holder;
420  std::list<ChunkIter> local_chunk_iter_holder;
421  std::list<size_t> local_chunk_num_tuples;
422  {
423  std::lock_guard<std::mutex> linearize_guard(varlen_chunk_fetch_mutex_);
424  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
425  const auto& fragment = (*fragments)[frag_id];
426  if (fragment.isEmptyPhysicalFragment()) {
427  continue;
428  }
429  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
430  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
431  ChunkKey chunk_key{
432  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
433  chunk = Chunk_NS::Chunk::getChunk(cd,
434  &cat.getDataMgr(),
435  chunk_key,
437  0,
438  chunk_meta_it->second->numBytes,
439  chunk_meta_it->second->numElements);
440  local_chunk_holder.push_back(chunk);
441  auto chunk_iter = chunk->begin_iterator(chunk_meta_it->second);
442  local_chunk_iter_holder.push_back(chunk_iter);
443  local_chunk_num_tuples.push_back(fragment.getNumTuples());
444  total_num_tuples += fragment.getNumTuples();
445  total_data_buf_size += chunk->getBuffer()->size();
446  std::ostringstream oss;
447  oss << "Load chunk for col_name: " << chunk->getColumnDesc()->columnName
448  << ", col_id: " << chunk->getColumnDesc()->columnId << ", Frag-" << frag_id
449  << ", numTuples: " << fragment.getNumTuples()
450  << ", data_size: " << chunk->getBuffer()->size();
451  if (chunk->getIndexBuf()) {
452  auto idx_buf_size = chunk->getIndexBuf()->size() - sizeof(ArrayOffsetT);
453  oss << ", index_size: " << idx_buf_size;
454  total_idx_buf_size += idx_buf_size;
455  }
456  VLOG(2) << oss.str();
457  }
458  }
459 
460  auto& col_ti = cd->columnType;
461  MergedChunk res{nullptr, nullptr};
462  // Do linearize multi-fragmented column depending on column type
463  // We cover array and non-encoded text columns
464  // Note that geo column is actually organized as a set of arrays
465  // and each geo object has different set of vectors that they require
466  // Here, we linearize each array at a time, so eventually the geo object has a set of
467  // "linearized" arrays
468  {
469  std::lock_guard<std::mutex> linearization_guard(linearization_mutex_);
470  if (col_ti.is_array()) {
471  if (col_ti.is_fixlen_array()) {
472  VLOG(2) << "Linearize fixed-length multi-frag array column (col_id: "
473  << cd->columnId << ", col_name: " << cd->columnName
474  << ", device_type: " << getMemoryLevelString(memory_level)
475  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
477  chunk_holder,
478  chunk_iter_holder,
479  local_chunk_holder,
480  local_chunk_iter_holder,
481  local_chunk_num_tuples,
482  memory_level,
483  cd,
484  device_id,
485  total_data_buf_size,
486  total_idx_buf_size,
487  total_num_tuples,
488  device_allocator,
489  thread_idx);
490  } else {
491  CHECK(col_ti.is_varlen_array());
492  VLOG(2) << "Linearize variable-length multi-frag array column (col_id: "
493  << cd->columnId << ", col_name: " << cd->columnName
494  << ", device_type: " << getMemoryLevelString(memory_level)
495  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
497  chunk_holder,
498  chunk_iter_holder,
499  local_chunk_holder,
500  local_chunk_iter_holder,
501  local_chunk_num_tuples,
502  memory_level,
503  cd,
504  device_id,
505  total_data_buf_size,
506  total_idx_buf_size,
507  total_num_tuples,
508  device_allocator,
509  thread_idx);
510  }
511  }
512  if (col_ti.is_string() && !col_ti.is_dict_encoded_string()) {
513  VLOG(2) << "Linearize variable-length multi-frag non-encoded text column (col_id: "
514  << cd->columnId << ", col_name: " << cd->columnName
515  << ", device_type: " << getMemoryLevelString(memory_level)
516  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
518  chunk_holder,
519  chunk_iter_holder,
520  local_chunk_holder,
521  local_chunk_iter_holder,
522  local_chunk_num_tuples,
523  memory_level,
524  cd,
525  device_id,
526  total_data_buf_size,
527  total_idx_buf_size,
528  total_num_tuples,
529  device_allocator,
530  thread_idx);
531  }
532  }
533  CHECK(res.first); // check merged data buffer
534  if (!col_ti.is_fixlen_array()) {
535  CHECK(res.second); // check merged index buffer
536  }
537  auto merged_data_buffer = res.first;
538  auto merged_index_buffer = res.second;
539 
540  // prepare ChunkIter for the linearized chunk
541  auto merged_chunk =
542  std::make_shared<Chunk_NS::Chunk>(merged_data_buffer, merged_index_buffer, cd);
543  // to prepare chunk_iter for the merged chunk, we pass one of local chunk iter
544  // to fill necessary metadata that is a common for all merged chunks
545  auto merged_chunk_iter = prepareChunkIter(merged_data_buffer,
546  merged_index_buffer,
547  *(local_chunk_iter_holder.rbegin()),
548  is_varlen_chunk,
549  total_num_tuples);
550  {
551  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
552  chunk_holder.push_back(merged_chunk);
553  chunk_iter_holder.push_back(merged_chunk_iter);
554  }
555 
556  auto merged_chunk_iter_ptr = reinterpret_cast<int8_t*>(&(chunk_iter_holder.back()));
557  if (memory_level == MemoryLevel::CPU_LEVEL) {
558  addMergedChunkIter(col_desc, 0, merged_chunk_iter_ptr);
559  return merged_chunk_iter_ptr;
560  } else {
561  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
562  CHECK(device_allocator);
563  addMergedChunkIter(col_desc, device_id, merged_chunk_iter_ptr);
564  // note that merged_chunk_iter_ptr resides in CPU memory space
565  // having its content aware GPU buffer that we alloc. for merging
566  // so we need to copy this chunk_iter to each device explicitly
567  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
568  device_allocator->copyToDevice(
569  chunk_iter_gpu, merged_chunk_iter_ptr, sizeof(ChunkIter));
570  return chunk_iter_gpu;
571  }
572 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
std::vector< int > ChunkKey
Definition: types.h:37
std::string cat(Ts &&...args)
std::mutex varlen_chunk_fetch_mutex_
virtual void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const =0
virtual int8_t * alloc(const size_t num_bytes)=0
const int8_t * getChunkiter(const InputColDescriptor col_desc, const int device_id=0) const
Executor * executor_
std::mutex linearization_mutex_
#define CHECK_GT(x, y)
Definition: Logger.h:218
void addMergedChunkIter(const InputColDescriptor col_desc, const int device_id, int8_t *chunk_iter_ptr) const
MergedChunk linearizeVarLenArrayColFrags(const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
MergedChunk linearizeFixedLenArrayColFrags(const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::mutex linearized_col_cache_mutex_
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
std::mutex chunk_list_mutex_
int32_t ArrayOffsetT
Definition: sqltypes.h:952
std::pair< AbstractBuffer *, AbstractBuffer * > MergedChunk
Definition: ColumnFetcher.h:47
#define CHECK(condition)
Definition: Logger.h:206
#define DEBUG_TIMER(name)
Definition: Logger.h:322
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
ChunkIter prepareChunkIter(AbstractBuffer *merged_data_buf, AbstractBuffer *merged_index_buf, ChunkIter &chunk_iter, bool is_true_varlen_type, const size_t total_num_tuples) const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
#define VLOG(n)
Definition: Logger.h:300
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:191

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

MergedChunk ColumnFetcher::linearizeFixedLenArrayColFrags ( const Catalog_Namespace::Catalog cat,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  local_chunk_holder,
std::list< ChunkIter > &  local_chunk_iter_holder,
std::list< size_t > &  local_chunk_num_tuples,
MemoryLevel  memory_level,
const ColumnDescriptor cd,
const int  device_id,
const size_t  total_data_buf_size,
const size_t  total_idx_buf_size,
const size_t  total_num_tuples,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const
private

Definition at line 849 of file ColumnFetcher.cpp.

References Data_Namespace::DataMgr::alloc(), CHECK_EQ, check_interrupt(), Data_Namespace::CPU_LEVEL, Executor::ERR_INTERRUPTED, g_enable_non_kernel_time_query_interrupt, Catalog_Namespace::Catalog::getDataMgr(), anonymous_namespace{ColumnFetcher.cpp}::getMemoryLevelString(), timer_start(), timer_stop(), and VLOG.

Referenced by linearizeColumnFragments().

863  {
864  int64_t linearization_time_ms = 0;
865  auto clock_begin = timer_start();
866  // linearize collected fragments
867  auto merged_data_buffer =
868  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
869  VLOG(2) << "Allocate " << total_data_buf_size
870  << " bytes of buffer space for linearized chunks (memory_level: "
871  << getMemoryLevelString(memory_level) << ", device_id: " << device_id << ")";
872  size_t sum_data_buf_size = 0;
873  auto chunk_holder_it = local_chunk_holder.begin();
874  auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
875  for (; chunk_holder_it != local_chunk_holder.end();
876  chunk_holder_it++, chunk_iter_holder_it++) {
879  }
880  auto target_chunk = chunk_holder_it->get();
881  auto target_chunk_data_buffer = target_chunk->getBuffer();
882  merged_data_buffer->append(target_chunk_data_buffer->getMemoryPtr(),
883  target_chunk_data_buffer->size(),
885  device_id);
886  sum_data_buf_size += target_chunk_data_buffer->size();
887  }
888  // check whether each chunk's data buffer is clean under chunk merging
889  CHECK_EQ(total_data_buf_size, sum_data_buf_size);
890  linearization_time_ms += timer_stop(clock_begin);
891  VLOG(2) << "Linearization has been successfully done, elapsed time: "
892  << linearization_time_ms << " ms.";
893  return {merged_data_buffer, nullptr};
894 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1142
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:115
__device__ bool check_interrupt()
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
#define VLOG(n)
Definition: Logger.h:300
Type timer_start()
Definition: measure.h:42
AbstractBuffer * alloc(const MemoryLevel memoryLevel, const int deviceId, const size_t numBytes)
Definition: DataMgr.cpp:460

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

MergedChunk ColumnFetcher::linearizeVarLenArrayColFrags ( const Catalog_Namespace::Catalog cat,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  local_chunk_holder,
std::list< ChunkIter > &  local_chunk_iter_holder,
std::list< size_t > &  local_chunk_num_tuples,
MemoryLevel  memory_level,
const ColumnDescriptor cd,
const int  device_id,
const size_t  total_data_buf_size,
const size_t  total_idx_buf_size,
const size_t  total_num_tuples,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const
private

Definition at line 574 of file ColumnFetcher.cpp.

References Data_Namespace::DataMgr::alloc(), Data_Namespace::AbstractBuffer::append(), CHECK, check_interrupt(), ColumnDescriptor::columnId, gpu_enabled::copy(), DeviceAllocator::copyToDevice(), Data_Namespace::CPU_LEVEL, cpu_threads(), ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE, run_benchmark_import::dest, Executor::ERR_INTERRUPTED, g_enable_non_kernel_time_query_interrupt, g_enable_parallel_linearization, Catalog_Namespace::Catalog::getDataMgr(), anonymous_namespace{ColumnFetcher.cpp}::getMemoryLevelString(), Data_Namespace::AbstractBuffer::getMemoryPtr(), Data_Namespace::GPU_LEVEL, i, LIKELY, linearized_col_cache_mutex_, linearized_data_buf_cache_, linearized_idx_buf_cache_, linearlized_temporary_cpu_index_buf_cache_, makeIntervals(), gpu_enabled::sort(), ColumnDescriptor::tableId, timer_start(), timer_stop(), and VLOG.

Referenced by linearizeColumnFragments().

588  {
589  // for linearization of varlen col we have to deal with not only data buffer
590  // but also its underlying index buffer which is responsible for offset of varlen value
591  // basically we maintain per-device linearized (data/index) buffer
592  // for data buffer, we linearize varlen col's chunks within a device-specific buffer
593  // by just appending each chunk
594  // for index buffer, we need to not only appending each chunk but modify the offset
595  // value to affect various conditions like nullness, padding and so on so we first
596  // append index buffer in CPU, manipulate it as we required and then copy it to specific
597  // device if necessary (for GPU execution)
598  AbstractBuffer* merged_index_buffer_in_cpu = nullptr;
599  AbstractBuffer* merged_data_buffer = nullptr;
600  bool has_cached_merged_idx_buf = false;
601  bool has_cached_merged_data_buf = false;
602  const InputColDescriptor icd(cd->columnId, cd->tableId, int(0));
603  // check linearized buffer's cache first
604  // if not exists, alloc necessary buffer space to prepare linearization
605  int64_t linearization_time_ms = 0;
606  auto clock_begin = timer_start();
607  {
608  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
609  auto cached_data_buf_cache_it = linearized_data_buf_cache_.find(icd);
610  if (cached_data_buf_cache_it != linearized_data_buf_cache_.end()) {
611  auto& cd_cache = cached_data_buf_cache_it->second;
612  auto cached_data_buf_it = cd_cache.find(device_id);
613  if (cached_data_buf_it != cd_cache.end()) {
614  has_cached_merged_data_buf = true;
615  merged_data_buffer = cached_data_buf_it->second;
616  } else {
617  merged_data_buffer =
618  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
619  VLOG(2) << "Allocate " << total_data_buf_size
620  << " bytes of buffer space for linearized chunks (memory_level: "
621  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
622  << ")";
623  cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
624  }
625  } else {
627  merged_data_buffer =
628  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
629  VLOG(2) << "Allocate " << total_data_buf_size
630  << " bytes of buffer space for linearized chunks (memory_level: "
631  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
632  << ")";
633  m.insert(std::make_pair(device_id, merged_data_buffer));
634  linearized_data_buf_cache_.insert(std::make_pair(icd, m));
635  }
636 
637  auto cached_index_buf_it =
639  if (cached_index_buf_it != linearlized_temporary_cpu_index_buf_cache_.end()) {
640  has_cached_merged_idx_buf = true;
641  merged_index_buffer_in_cpu = cached_index_buf_it->second;
642  } else {
643  merged_index_buffer_in_cpu = cat.getDataMgr().alloc(
644  Data_Namespace::CPU_LEVEL, 0, total_idx_buf_size + sizeof(ArrayOffsetT));
645  VLOG(2) << "Allocate " << total_data_buf_size
646  << " bytes of temorary buffer space on CPU for linearized chunks";
647  // just copy the buf addr since we access it via the pointer itself
649  std::make_pair(cd->columnId, merged_index_buffer_in_cpu));
650  }
651  }
652 
653  // linearize buffers if we don't have corresponding buf in cache
654  size_t sum_data_buf_size = 0;
655  size_t sum_idx_buf_size = 0;
656  size_t cur_sum_num_tuples = 0;
657  size_t total_idx_size_modifier = 0;
658  auto chunk_holder_it = local_chunk_holder.begin();
659  auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
660  auto chunk_num_tuple_it = local_chunk_num_tuples.begin();
661  bool null_padded_first_elem = false;
662  bool null_padded_last_val = false;
663  for (; chunk_holder_it != local_chunk_holder.end();
664  chunk_holder_it++, chunk_iter_holder_it++, chunk_num_tuple_it++) {
667  }
668  auto target_chunk = chunk_holder_it->get();
669  auto target_chunk_data_buffer = target_chunk->getBuffer();
670  auto cur_chunk_num_tuples = *chunk_num_tuple_it;
671  auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
672  auto target_idx_buf_ptr =
673  reinterpret_cast<ArrayOffsetT*>(target_chunk_idx_buffer->getMemoryPtr());
674  auto idx_buf_size = target_chunk_idx_buffer->size() - sizeof(ArrayOffsetT);
675  auto target_data_buffer_start_ptr = target_chunk_data_buffer->getMemoryPtr();
676  auto target_data_buffer_size = target_chunk_data_buffer->size();
677 
678  // when linearizing idx buffers, we need to consider the following cases
679  // 1. the first idx val is padded (a. null / b. empty varlen arr / c. 1-byte size
680  // varlen arr, i.e., {1})
681  // 2. the last idx val is null
682  // 3. null value(s) is/are located in a middle of idx buf <-- we don't need to care
683  if (cur_sum_num_tuples > 0 && target_idx_buf_ptr[0] > 0) {
684  null_padded_first_elem = true;
685  target_data_buffer_start_ptr += ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
686  target_data_buffer_size -= ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
687  total_idx_size_modifier += ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
688  }
689  // we linearize data_buf in device-specific buffer
690  if (!has_cached_merged_data_buf) {
691  merged_data_buffer->append(target_data_buffer_start_ptr,
692  target_data_buffer_size,
694  device_id);
695  }
696 
697  if (!has_cached_merged_idx_buf) {
698  // linearize idx buf in CPU first
699  merged_index_buffer_in_cpu->append(target_chunk_idx_buffer->getMemoryPtr(),
700  idx_buf_size,
702  0); // merged_index_buffer_in_cpu resides in CPU
703  auto idx_buf_ptr =
704  reinterpret_cast<ArrayOffsetT*>(merged_index_buffer_in_cpu->getMemoryPtr());
705  // here, we do not need to manipulate the very first idx buf, just let it as is
706  // and modify otherwise (i.e., starting from second chunk idx buf)
707  if (cur_sum_num_tuples > 0) {
708  if (null_padded_last_val) {
709  // case 2. the previous chunk's last index val is null so we need to set this
710  // chunk's first val to be null
711  idx_buf_ptr[cur_sum_num_tuples] = -sum_data_buf_size;
712  }
713  const size_t worker_count = cpu_threads();
714  std::vector<std::future<void>> conversion_threads;
715  std::vector<std::vector<size_t>> null_padded_row_idx_vecs(worker_count,
716  std::vector<size_t>());
717  bool is_parallel_modification = false;
718  std::vector<size_t> null_padded_row_idx_vec;
719  const auto do_work = [&cur_sum_num_tuples,
720  &sum_data_buf_size,
721  &null_padded_first_elem,
722  &idx_buf_ptr](
723  const size_t start,
724  const size_t end,
725  const bool is_parallel_modification,
726  std::vector<size_t>* null_padded_row_idx_vec) {
727  for (size_t i = start; i < end; i++) {
728  if (LIKELY(idx_buf_ptr[cur_sum_num_tuples + i] >= 0)) {
729  if (null_padded_first_elem) {
730  // deal with null padded bytes
731  idx_buf_ptr[cur_sum_num_tuples + i] -=
733  }
734  idx_buf_ptr[cur_sum_num_tuples + i] += sum_data_buf_size;
735  } else {
736  // null padded row needs to reference the previous row idx so in
737  // multi-threaded index modification we may suffer from thread
738  // contention when thread-i needs to reference thread-j's row idx so we
739  // collect row idxs for null rows here and deal with them after this
740  // step
741  null_padded_row_idx_vec->push_back(cur_sum_num_tuples + i);
742  }
743  }
744  };
745  if (cur_chunk_num_tuples > g_enable_parallel_linearization) {
746  is_parallel_modification = true;
747  for (auto interval :
748  makeIntervals(size_t(0), cur_chunk_num_tuples, worker_count)) {
749  conversion_threads.push_back(
750  std::async(std::launch::async,
751  do_work,
752  interval.begin,
753  interval.end,
754  is_parallel_modification,
755  &null_padded_row_idx_vecs[interval.index]));
756  }
757  for (auto& child : conversion_threads) {
758  child.wait();
759  }
760  for (auto& v : null_padded_row_idx_vecs) {
761  std::copy(v.begin(), v.end(), std::back_inserter(null_padded_row_idx_vec));
762  }
763  } else {
764  do_work(size_t(0),
765  cur_chunk_num_tuples,
766  is_parallel_modification,
767  &null_padded_row_idx_vec);
768  }
769  if (!null_padded_row_idx_vec.empty()) {
770  // modify null padded row idxs by referencing the previous row
771  // here we sort row idxs to correctly propagate modified row idxs
772  std::sort(null_padded_row_idx_vec.begin(), null_padded_row_idx_vec.end());
773  for (auto& padded_null_row_idx : null_padded_row_idx_vec) {
774  if (idx_buf_ptr[padded_null_row_idx - 1] > 0) {
775  idx_buf_ptr[padded_null_row_idx] = -idx_buf_ptr[padded_null_row_idx - 1];
776  } else {
777  idx_buf_ptr[padded_null_row_idx] = idx_buf_ptr[padded_null_row_idx - 1];
778  }
779  }
780  }
781  }
782  }
783  sum_idx_buf_size += idx_buf_size;
784  cur_sum_num_tuples += cur_chunk_num_tuples;
785  sum_data_buf_size += target_chunk_data_buffer->size();
786  if (target_idx_buf_ptr[*chunk_num_tuple_it] < 0) {
787  null_padded_last_val = true;
788  } else {
789  null_padded_last_val = false;
790  }
791  if (null_padded_first_elem) {
792  sum_data_buf_size -= ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
793  null_padded_first_elem = false; // set for the next chunk
794  }
795  if (cur_sum_num_tuples == total_num_tuples) {
796  auto merged_index_buffer_ptr =
797  reinterpret_cast<ArrayOffsetT*>(merged_index_buffer_in_cpu->getMemoryPtr());
798  merged_index_buffer_ptr[total_num_tuples] =
799  total_data_buf_size -
800  total_idx_size_modifier; // last index value is total data size;
801  }
802  }
803 
804  // put linearized index buffer to per-device cache
805  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
806  AbstractBuffer* merged_index_buffer = nullptr;
807  auto copyBuf =
808  [&device_allocator](
809  int8_t* src, int8_t* dest, size_t buf_size, MemoryLevel memory_level) {
810  if (memory_level == Data_Namespace::CPU_LEVEL) {
811  memcpy((void*)dest, src, buf_size);
812  } else {
813  CHECK(memory_level == Data_Namespace::GPU_LEVEL);
814  device_allocator->copyToDevice(dest, src, buf_size);
815  }
816  };
817  auto merged_idx_buf_cache_it = linearized_idx_buf_cache_.find(icd);
818  size_t buf_size = total_idx_buf_size + sizeof(ArrayOffsetT);
819  if (merged_idx_buf_cache_it != linearized_idx_buf_cache_.end()) {
820  auto& merged_idx_buf_cache = merged_idx_buf_cache_it->second;
821  auto merged_idx_buf_it = merged_idx_buf_cache.find(device_id);
822  if (merged_idx_buf_it != merged_idx_buf_cache.end()) {
823  merged_index_buffer = merged_idx_buf_it->second;
824  } else {
825  merged_index_buffer = cat.getDataMgr().alloc(memory_level, device_id, buf_size);
826  copyBuf(merged_index_buffer_in_cpu->getMemoryPtr(),
827  merged_index_buffer->getMemoryPtr(),
828  buf_size,
829  memory_level);
830  merged_idx_buf_cache.insert(std::make_pair(device_id, merged_index_buffer));
831  }
832  } else {
833  merged_index_buffer = cat.getDataMgr().alloc(memory_level, device_id, buf_size);
834  copyBuf(merged_index_buffer_in_cpu->getMemoryPtr(),
835  merged_index_buffer->getMemoryPtr(),
836  buf_size,
837  memory_level);
839  m.insert(std::make_pair(device_id, merged_index_buffer));
840  linearized_idx_buf_cache_.insert(std::make_pair(icd, m));
841  }
842  CHECK(merged_index_buffer);
843  linearization_time_ms += timer_stop(clock_begin);
844  VLOG(2) << "Linearization has been successfully done, elapsed time: "
845  << linearization_time_ms << " ms.";
846  return {merged_data_buffer, merged_index_buffer};
847 }
std::unordered_map< int, AbstractBuffer * > DeviceMergedChunkMap
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1142
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
virtual int8_t * getMemoryPtr()=0
virtual void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const =0
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:116
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:115
__device__ bool check_interrupt()
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
std::unordered_map< int, AbstractBuffer * > linearlized_temporary_cpu_index_buf_cache_
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define LIKELY(x)
Definition: likely.h:24
std::mutex linearized_col_cache_mutex_
An AbstractBuffer is a unit of data management for a data manager.
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
int32_t ArrayOffsetT
Definition: sqltypes.h:952
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
#define CHECK(condition)
Definition: Logger.h:206
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_idx_buf_cache_
int cpu_threads()
Definition: thread_count.h:24
size_t g_enable_parallel_linearization
Definition: Execute.cpp:132
#define VLOG(n)
Definition: Logger.h:300
Type timer_start()
Definition: measure.h:42
AbstractBuffer * alloc(const MemoryLevel memoryLevel, const int deviceId, const size_t numBytes)
Definition: DataMgr.cpp:460

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

JoinColumn ColumnFetcher::makeJoinColumn ( Executor executor,
const Analyzer::ColumnVar hash_col,
const std::vector< Fragmenter_Namespace::FragmentInfo > &  fragments,
const Data_Namespace::MemoryLevel  effective_mem_lvl,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner,
std::vector< std::shared_ptr< void >> &  malloc_owner,
ColumnCacheMap column_cache 
)
static

Creates a JoinColumn struct containing an array of JoinChunk structs.

makeJoinColumn() creates a JoinColumn struct containing a array of JoinChunk structs, col_chunks_buff, malloced in CPU memory. Although the col_chunks_buff array is in CPU memory here, each JoinChunk struct contains an int8_t* pointer from getOneColumnFragment(), col_buff, that can point to either CPU memory or GPU memory depending on the effective_mem_lvl parameter. See also the fetchJoinColumn() function where col_chunks_buff is copied into GPU memory if needed. The malloc_owner parameter will have the malloced array appended. The chunks_owner parameter will be appended with the chunks.

Definition at line 153 of file ColumnFetcher.cpp.

References CHECK, CHECK_GT, check_interrupt(), checked_malloc(), JoinChunk::col_buff, Data_Namespace::CPU_LEVEL, Executor::ERR_INTERRUPTED, g_enable_non_kernel_time_query_interrupt, SQLTypeInfo::get_size(), Analyzer::Expr::get_type_info(), getOneColumnFragment(), and JoinChunk::num_elems.

Referenced by HashJoin::fetchJoinColumn().

163  {
164  CHECK(!fragments.empty());
165 
166  size_t col_chunks_buff_sz = sizeof(struct JoinChunk) * fragments.size();
167  // TODO: needs an allocator owner
168  auto col_chunks_buff = reinterpret_cast<int8_t*>(
169  malloc_owner.emplace_back(checked_malloc(col_chunks_buff_sz), free).get());
170  auto join_chunk_array = reinterpret_cast<struct JoinChunk*>(col_chunks_buff);
171 
172  size_t num_elems = 0;
173  size_t num_chunks = 0;
174  for (auto& frag : fragments) {
177  }
178  auto [col_buff, elem_count] = getOneColumnFragment(
179  executor,
180  hash_col,
181  frag,
182  effective_mem_lvl,
183  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
184  device_allocator,
185  thread_idx,
186  chunks_owner,
187  column_cache);
188  if (col_buff != nullptr) {
189  num_elems += elem_count;
190  join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
191  } else {
192  continue;
193  }
194  ++num_chunks;
195  }
196 
197  int elem_sz = hash_col.get_type_info().get_size();
198  CHECK_GT(elem_sz, 0);
199 
200  return {col_chunks_buff,
201  col_chunks_buff_sz,
202  num_chunks,
203  num_elems,
204  static_cast<size_t>(elem_sz)};
205 }
HOST DEVICE int get_size() const
Definition: sqltypes.h:333
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1142
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:115
const int8_t * col_buff
#define CHECK_GT(x, y)
Definition: Logger.h:218
__device__ bool check_interrupt()
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
#define CHECK(condition)
Definition: Logger.h:206
size_t num_elems

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ChunkIter ColumnFetcher::prepareChunkIter ( AbstractBuffer merged_data_buf,
AbstractBuffer merged_index_buf,
ChunkIter chunk_iter,
bool  is_true_varlen_type,
const size_t  total_num_tuples 
) const
private

Definition at line 957 of file ColumnFetcher.cpp.

References ChunkIter::current_pos, ChunkIter::end_pos, Data_Namespace::AbstractBuffer::getMemoryPtr(), ChunkIter::num_elems, ChunkIter::second_buf, Data_Namespace::AbstractBuffer::size(), ChunkIter::skip, ChunkIter::skip_size, ChunkIter::start_pos, and ChunkIter::type_info.

Referenced by linearizeColumnFragments().

961  {
962  ChunkIter merged_chunk_iter;
963  if (is_true_varlen_type) {
964  merged_chunk_iter.start_pos = merged_index_buf->getMemoryPtr();
965  merged_chunk_iter.current_pos = merged_index_buf->getMemoryPtr();
966  merged_chunk_iter.end_pos = merged_index_buf->getMemoryPtr() +
967  merged_index_buf->size() - sizeof(StringOffsetT);
968  merged_chunk_iter.second_buf = merged_data_buf->getMemoryPtr();
969  } else {
970  merged_chunk_iter.start_pos = merged_data_buf->getMemoryPtr();
971  merged_chunk_iter.current_pos = merged_data_buf->getMemoryPtr();
972  merged_chunk_iter.end_pos = merged_data_buf->getMemoryPtr() + merged_data_buf->size();
973  merged_chunk_iter.second_buf = nullptr;
974  }
975  merged_chunk_iter.num_elems = total_num_tuples;
976  merged_chunk_iter.skip = chunk_iter.skip;
977  merged_chunk_iter.skip_size = chunk_iter.skip_size;
978  merged_chunk_iter.type_info = chunk_iter.type_info;
979  return merged_chunk_iter;
980 }
int8_t * start_pos
Definition: ChunkIter.h:33
int8_t * current_pos
Definition: ChunkIter.h:32
SQLTypeInfo type_info
Definition: ChunkIter.h:30
virtual int8_t * getMemoryPtr()=0
int32_t StringOffsetT
Definition: sqltypes.h:951
int8_t * end_pos
Definition: ChunkIter.h:34
size_t num_elems
Definition: ChunkIter.h:37
int skip_size
Definition: ChunkIter.h:36
int8_t * second_buf
Definition: ChunkIter.h:31
int skip
Definition: ChunkIter.h:35

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::transferColumnIfNeeded ( const ColumnarResults columnar_results,
const int  col_id,
Data_Namespace::DataMgr data_mgr,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator 
)
staticprivate

Definition at line 896 of file ColumnFetcher.cpp.

References Allocator::alloc(), CHECK, CHECK_LT, DeviceAllocator::copyToDevice(), ColumnarResults::getColumnBuffers(), ColumnarResults::getColumnType(), Data_Namespace::GPU_LEVEL, and ColumnarResults::size().

Referenced by getAllTableColumnFragments(), getOneColumnFragment(), and getResultSetColumn().

902  {
903  if (!columnar_results) {
904  return nullptr;
905  }
906  const auto& col_buffers = columnar_results->getColumnBuffers();
907  CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
908  if (memory_level == Data_Namespace::GPU_LEVEL) {
909  const auto& col_ti = columnar_results->getColumnType(col_id);
910  const auto num_bytes = columnar_results->size() * col_ti.get_size();
911  CHECK(device_allocator);
912  auto gpu_col_buffer = device_allocator->alloc(num_bytes);
913  device_allocator->copyToDevice(gpu_col_buffer, col_buffers[col_id], num_bytes);
914  return gpu_col_buffer;
915  }
916  return col_buffers[col_id];
917 }
virtual void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const =0
virtual int8_t * alloc(const size_t num_bytes)=0
const size_t size() const
#define CHECK_LT(x, y)
Definition: Logger.h:216
#define CHECK(condition)
Definition: Logger.h:206
const std::vector< int8_t * > & getColumnBuffers() const
const SQLTypeInfo & getColumnType(const int col_id) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Friends And Related Function Documentation

friend class QueryCompilationDescriptor
friend

Definition at line 200 of file ColumnFetcher.h.

friend class TableFunctionExecutionContext
friend

Definition at line 201 of file ColumnFetcher.h.

Member Data Documentation

std::mutex ColumnFetcher::chunk_list_mutex_
mutableprivate

Definition at line 184 of file ColumnFetcher.h.

Referenced by getOneTableColumnFragment(), and linearizeColumnFragments().

std::mutex ColumnFetcher::columnar_fetch_mutex_
mutableprivate

Definition at line 181 of file ColumnFetcher.h.

Referenced by getAllTableColumnFragments(), and getResultSetColumn().

std::unordered_map<InputColDescriptor, std::unique_ptr<const ColumnarResults> > ColumnFetcher::columnarized_scan_table_cache_
mutableprivate

Definition at line 188 of file ColumnFetcher.h.

Referenced by getAllTableColumnFragments().

ColumnCacheMap ColumnFetcher::columnarized_table_cache_
mutableprivate
std::mutex ColumnFetcher::linearization_mutex_
mutableprivate

Definition at line 183 of file ColumnFetcher.h.

Referenced by linearizeColumnFragments().

std::mutex ColumnFetcher::linearized_col_cache_mutex_
mutableprivate
std::unordered_map<InputColDescriptor, DeviceMergedChunkMap> ColumnFetcher::linearized_data_buf_cache_
mutableprivate

Definition at line 196 of file ColumnFetcher.h.

Referenced by freeLinearizedBuf(), and linearizeVarLenArrayColFrags().

std::unordered_map<InputColDescriptor, DeviceMergedChunkMap> ColumnFetcher::linearized_idx_buf_cache_
mutableprivate

Definition at line 198 of file ColumnFetcher.h.

Referenced by freeLinearizedBuf(), and linearizeVarLenArrayColFrags().

std::unordered_map<InputColDescriptor, DeviceMergedChunkIterMap> ColumnFetcher::linearized_multi_frag_chunk_iter_cache_
mutableprivate

Definition at line 192 of file ColumnFetcher.h.

Referenced by addMergedChunkIter(), getChunkiter(), and linearizeColumnFragments().

std::unordered_map<int, AbstractBuffer*> ColumnFetcher::linearlized_temporary_cpu_index_buf_cache_
mutableprivate
std::mutex ColumnFetcher::varlen_chunk_fetch_mutex_
mutableprivate

Definition at line 182 of file ColumnFetcher.h.

Referenced by getOneTableColumnFragment(), and linearizeColumnFragments().


The documentation for this class was generated from the following files: