OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnFetcher Class Reference

#include <ColumnFetcher.h>

+ Collaboration diagram for ColumnFetcher:

Public Member Functions

 ColumnFetcher (Executor *executor, const ColumnCacheMap &column_cache)
 
const int8_t * getOneTableColumnFragment (const shared::TableKey &table_key, const int frag_id, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
 
const int8_t * getAllTableColumnFragments (const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
const int8_t * getResultSetColumn (const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
const int8_t * linearizeColumnFragments (const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
void freeTemporaryCpuLinearizedIdxBuf ()
 
void freeLinearizedBuf ()
 

Static Public Member Functions

static std::pair< const int8_t
*, size_t > 
getOneColumnFragment (Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
 Gets one chunk's pointer and element count on either CPU or GPU. More...
 
static JoinColumn makeJoinColumn (Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
 Creates a JoinColumn struct containing an array of JoinChunk structs. More...
 

Private Types

using DeviceMergedChunkIterMap = std::unordered_map< int, int8_t * >
 
using DeviceMergedChunkMap = std::unordered_map< int, AbstractBuffer * >
 

Private Member Functions

MergedChunk linearizeVarLenArrayColFrags (int32_t db_id, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
MergedChunk linearizeFixedLenArrayColFrags (int32_t db_id, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
void addMergedChunkIter (const InputColDescriptor col_desc, const int device_id, int8_t *chunk_iter_ptr) const
 
const int8_t * getChunkiter (const InputColDescriptor col_desc, const int device_id=0) const
 
ChunkIter prepareChunkIter (AbstractBuffer *merged_data_buf, AbstractBuffer *merged_index_buf, ChunkIter &chunk_iter, bool is_true_varlen_type, const size_t total_num_tuples) const
 
const int8_t * getResultSetColumn (const ResultSetPtr &buffer, const shared::TableKey &table_key, const int col_id, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 

Static Private Member Functions

static const int8_t * transferColumnIfNeeded (const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
 

Private Attributes

Executorexecutor_
 
std::mutex columnar_fetch_mutex_
 
std::mutex varlen_chunk_fetch_mutex_
 
std::mutex linearization_mutex_
 
std::mutex chunk_list_mutex_
 
std::mutex linearized_col_cache_mutex_
 
ColumnCacheMap columnarized_table_cache_
 
std::unordered_map
< InputColDescriptor,
std::unique_ptr< const
ColumnarResults > > 
columnarized_scan_table_cache_
 
std::unordered_map
< InputColDescriptor,
DeviceMergedChunkIterMap
linearized_multi_frag_chunk_iter_cache_
 
std::unordered_map< int,
AbstractBuffer * > 
linearlized_temporary_cpu_index_buf_cache_
 
std::unordered_map
< InputColDescriptor,
DeviceMergedChunkMap
linearized_data_buf_cache_
 
std::unordered_map
< InputColDescriptor,
DeviceMergedChunkMap
linearized_idx_buf_cache_
 

Friends

class QueryCompilationDescriptor
 
class TableFunctionExecutionContext
 

Detailed Description

Definition at line 49 of file ColumnFetcher.h.

Member Typedef Documentation

using ColumnFetcher::DeviceMergedChunkIterMap = std::unordered_map<int, int8_t*>
private

Definition at line 189 of file ColumnFetcher.h.

using ColumnFetcher::DeviceMergedChunkMap = std::unordered_map<int, AbstractBuffer*>
private

Definition at line 190 of file ColumnFetcher.h.

Constructor & Destructor Documentation

ColumnFetcher::ColumnFetcher ( Executor executor,
const ColumnCacheMap column_cache 
)

Definition at line 61 of file ColumnFetcher.cpp.

62  : executor_(executor), columnarized_table_cache_(column_cache) {}
ColumnCacheMap columnarized_table_cache_
Executor * executor_

Member Function Documentation

void ColumnFetcher::addMergedChunkIter ( const InputColDescriptor  col_desc,
const int  device_id,
int8_t *  chunk_iter_ptr 
) const
private

Definition at line 1011 of file ColumnFetcher.cpp.

References InputColDescriptor::getColId(), InputColDescriptor::getScanDesc(), InputDescriptor::getTableKey(), linearized_col_cache_mutex_, linearized_multi_frag_chunk_iter_cache_, and VLOG.

Referenced by linearizeColumnFragments().

1013  {
1014  std::lock_guard<std::mutex> linearize_guard(linearized_col_cache_mutex_);
1015  auto chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
1016  if (chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
1017  auto iter_device_it = chunk_iter_it->second.find(device_id);
1018  if (iter_device_it == chunk_iter_it->second.end()) {
1019  VLOG(2) << "Additional merged chunk_iter for col_desc (tbl: "
1020  << col_desc.getScanDesc().getTableKey() << ", col: " << col_desc.getColId()
1021  << "), device_id: " << device_id;
1022  chunk_iter_it->second.emplace(device_id, chunk_iter_ptr);
1023  }
1024  } else {
1025  DeviceMergedChunkIterMap iter_m;
1026  iter_m.emplace(device_id, chunk_iter_ptr);
1027  VLOG(2) << "New merged chunk_iter for col_desc (tbl: "
1028  << col_desc.getScanDesc().getTableKey() << ", col: " << col_desc.getColId()
1029  << "), device_id: " << device_id;
1030  linearized_multi_frag_chunk_iter_cache_.emplace(col_desc, iter_m);
1031  }
1032 }
std::unordered_map< int, int8_t * > DeviceMergedChunkIterMap
int getColId() const
std::mutex linearized_col_cache_mutex_
const shared::TableKey & getTableKey() const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
const InputDescriptor & getScanDesc() const
#define VLOG(n)
Definition: Logger.h:387

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnFetcher::freeLinearizedBuf ( )

Definition at line 1074 of file ColumnFetcher.cpp.

References CHECK, executor_, linearized_col_cache_mutex_, linearized_data_buf_cache_, and linearized_idx_buf_cache_.

Referenced by Executor::executeWorkUnitImpl().

1074  {
1075  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
1076  CHECK(executor_);
1077  const auto data_mgr = executor_->getDataMgr();
1078 
1079  if (!linearized_data_buf_cache_.empty()) {
1080  for (auto& kv : linearized_data_buf_cache_) {
1081  for (auto& kv2 : kv.second) {
1082  data_mgr->free(kv2.second);
1083  }
1084  }
1085  }
1086 
1087  if (!linearized_idx_buf_cache_.empty()) {
1088  for (auto& kv : linearized_idx_buf_cache_) {
1089  for (auto& kv2 : kv.second) {
1090  data_mgr->free(kv2.second);
1091  }
1092  }
1093  }
1094 }
Executor * executor_
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
std::mutex linearized_col_cache_mutex_
#define CHECK(condition)
Definition: Logger.h:291
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_idx_buf_cache_

+ Here is the caller graph for this function:

void ColumnFetcher::freeTemporaryCpuLinearizedIdxBuf ( )

Definition at line 1096 of file ColumnFetcher.cpp.

References CHECK, executor_, linearized_col_cache_mutex_, and linearlized_temporary_cpu_index_buf_cache_.

Referenced by Executor::executeWorkUnitImpl().

1096  {
1097  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
1098  CHECK(executor_);
1099  const auto data_mgr = executor_->getDataMgr();
1102  data_mgr->free(kv.second);
1103  }
1104  }
1105 }
Executor * executor_
std::unordered_map< int, AbstractBuffer * > linearlized_temporary_cpu_index_buf_cache_
std::mutex linearized_col_cache_mutex_
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getAllTableColumnFragments ( const shared::TableKey table_key,
const int  col_id,
const std::map< shared::TableKey, const TableFragments * > &  all_tables_fragments,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const

Definition at line 284 of file ColumnFetcher.cpp.

References CHECK, columnar_fetch_mutex_, columnarized_scan_table_cache_, Data_Namespace::CPU_LEVEL, shared::TableKey::db_id, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, getOneTableColumnFragment(), InputColDescriptor::getScanDesc(), InputDescriptor::getSourceType(), ColumnarResults::mergeResults(), TABLE, shared::TableKey::table_id, and transferColumnIfNeeded().

Referenced by Executor::fetchChunks(), and Executor::fetchUnionChunks().

291  {
292  const auto fragments_it = all_tables_fragments.find(table_key);
293  CHECK(fragments_it != all_tables_fragments.end());
294  const auto fragments = fragments_it->second;
295  const auto frag_count = fragments->size();
296  std::vector<std::unique_ptr<ColumnarResults>> column_frags;
297  const ColumnarResults* table_column = nullptr;
298  const InputColDescriptor col_desc(col_id, table_key.table_id, table_key.db_id, int(0));
299  CHECK(col_desc.getScanDesc().getSourceType() == InputSourceType::TABLE);
300  {
301  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
302  auto column_it = columnarized_scan_table_cache_.find(col_desc);
303  if (column_it == columnarized_scan_table_cache_.end()) {
304  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
306  executor_->checkNonKernelTimeInterrupted()) {
308  }
309  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
310  std::list<ChunkIter> chunk_iter_holder;
311  const auto& fragment = (*fragments)[frag_id];
312  if (fragment.isEmptyPhysicalFragment()) {
313  continue;
314  }
315  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
316  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
317  auto col_buffer = getOneTableColumnFragment(table_key,
318  static_cast<int>(frag_id),
319  col_id,
320  all_tables_fragments,
321  chunk_holder,
322  chunk_iter_holder,
324  int(0),
325  device_allocator);
326  column_frags.push_back(
327  std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
328  col_buffer,
329  fragment.getNumTuples(),
330  chunk_meta_it->second->sqlType,
331  executor_->executor_id_,
332  thread_idx));
333  }
334  auto merged_results =
335  ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
336  table_column = merged_results.get();
337  columnarized_scan_table_cache_.emplace(col_desc, std::move(merged_results));
338  } else {
339  table_column = column_it->second.get();
340  }
341  }
342  return ColumnFetcher::transferColumnIfNeeded(table_column,
343  0,
344  executor_->getDataMgr(),
345  memory_level,
346  device_id,
347  device_allocator);
348 }
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1436
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::mutex columnar_fetch_mutex_
Executor * executor_
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
const int8_t * getOneTableColumnFragment(const shared::TableKey &table_key, const int frag_id, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getChunkiter ( const InputColDescriptor  col_desc,
const int  device_id = 0 
) const
private

Definition at line 1034 of file ColumnFetcher.cpp.

References InputColDescriptor::getColId(), InputColDescriptor::getScanDesc(), InputDescriptor::getTableKey(), linearized_multi_frag_chunk_iter_cache_, and VLOG.

Referenced by linearizeColumnFragments().

1035  {
1036  auto linearized_chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
1037  if (linearized_chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
1038  auto dev_iter_map_it = linearized_chunk_iter_it->second.find(device_id);
1039  if (dev_iter_map_it != linearized_chunk_iter_it->second.end()) {
1040  VLOG(2) << "Recycle merged chunk_iter for col_desc (tbl: "
1041  << col_desc.getScanDesc().getTableKey() << ", col: " << col_desc.getColId()
1042  << "), device_id: " << device_id;
1043  return dev_iter_map_it->second;
1044  }
1045  }
1046  return nullptr;
1047 }
int getColId() const
const shared::TableKey & getTableKey() const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
const InputDescriptor & getScanDesc() const
#define VLOG(n)
Definition: Logger.h:387

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< const int8_t *, size_t > ColumnFetcher::getOneColumnFragment ( Executor executor,
const Analyzer::ColumnVar hash_col,
const Fragmenter_Namespace::FragmentInfo fragment,
const Data_Namespace::MemoryLevel  effective_mem_lvl,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner,
ColumnCacheMap column_cache 
)
static

Gets one chunk's pointer and element count on either CPU or GPU.

Gets a column fragment chunk on CPU or on GPU depending on the effective memory level parameter. For temporary tables, the chunk will be copied to the GPU if needed. Returns a buffer pointer and an element count.

Definition at line 67 of file ColumnFetcher.cpp.

References CHECK, shared::ColumnKey::column_id, anonymous_namespace{ColumnFetcher.cpp}::columnarize_result(), Data_Namespace::CPU_LEVEL, shared::TableKey::db_id, DEBUG_TIMER, Fragmenter_Namespace::FragmentInfo::fragmentId, get_column_descriptor_maybe(), get_temporary_table(), Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMap(), Analyzer::ColumnVar::getColumnKey(), Fragmenter_Namespace::FragmentInfo::getNumTuples(), Fragmenter_Namespace::FragmentInfo::isEmptyPhysicalFragment(), Fragmenter_Namespace::FragmentInfo::physicalTableId, and transferColumnIfNeeded().

Referenced by RelAlgExecutor::createWindowFunctionContext(), TableFunctionExecutionContext::execute(), and makeJoinColumn().

76  {
77  static std::mutex columnar_conversion_mutex;
78  auto timer = DEBUG_TIMER(__func__);
79  if (fragment.isEmptyPhysicalFragment()) {
80  return {nullptr, 0};
81  }
82  const auto& column_key = hash_col.getColumnKey();
83  const auto cd = get_column_descriptor_maybe(column_key);
84  CHECK(!cd || !(cd->isVirtualCol));
85  const int8_t* col_buff = nullptr;
86  if (cd) { // real table
87  /* chunk_meta_it is used here to retrieve chunk numBytes and
88  numElements. Apparently, their values are often zeros. If we
89  knew how to predict the zero values, calling
90  getChunkMetadataMap could be avoided to skip
91  synthesize_metadata calls. */
92  auto chunk_meta_it = fragment.getChunkMetadataMap().find(column_key.column_id);
93  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
94  ChunkKey chunk_key{column_key.db_id,
95  fragment.physicalTableId,
96  column_key.column_id,
97  fragment.fragmentId};
98  const auto chunk = Chunk_NS::Chunk::getChunk(
99  cd,
100  executor->getDataMgr(),
101  chunk_key,
102  effective_mem_lvl,
103  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
104  chunk_meta_it->second->numBytes,
105  chunk_meta_it->second->numElements);
106  chunks_owner.push_back(chunk);
107  CHECK(chunk);
108  auto ab = chunk->getBuffer();
109  CHECK(ab->getMemoryPtr());
110  col_buff = reinterpret_cast<int8_t*>(ab->getMemoryPtr());
111  } else { // temporary table
112  const ColumnarResults* col_frag{nullptr};
113  {
114  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
115  const auto frag_id = fragment.fragmentId;
116  shared::TableKey table_key{column_key.db_id, column_key.table_id};
117  if (column_cache.empty() || !column_cache.count(table_key)) {
118  column_cache.insert(std::make_pair(
119  table_key,
120  std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
121  }
122  auto& frag_id_to_result = column_cache[table_key];
123  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
124  frag_id_to_result.insert(std::make_pair(
125  frag_id,
126  std::shared_ptr<const ColumnarResults>(columnarize_result(
127  executor->row_set_mem_owner_,
128  get_temporary_table(executor->temporary_tables_, table_key.table_id),
129  executor->executor_id_,
130  thread_idx,
131  frag_id))));
132  }
133  col_frag = column_cache[table_key][frag_id].get();
134  }
135  col_buff = transferColumnIfNeeded(
136  col_frag,
137  hash_col.getColumnKey().column_id,
138  executor->getDataMgr(),
139  effective_mem_lvl,
140  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
141  device_allocator);
142  }
143  return {col_buff, fragment.getNumTuples()};
144 }
std::vector< int > ChunkKey
Definition: types.h:36
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const size_t executor_id, const int frag_id)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:225
const ColumnDescriptor * get_column_descriptor_maybe(const shared::ColumnKey &column_key)
Definition: Execute.h:220
const ChunkMetadataMap & getChunkMetadataMap() const
const shared::ColumnKey & getColumnKey() const
Definition: Analyzer.h:198
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getOneTableColumnFragment ( const shared::TableKey table_key,
const int  frag_id,
const int  col_id,
const std::map< shared::TableKey, const TableFragments * > &  all_tables_fragments,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator 
) const

Definition at line 210 of file ColumnFetcher.cpp.

References Allocator::alloc(), CHECK, CHECK_EQ, CHECK_GT, chunk_list_mutex_, DeviceAllocator::copyToDevice(), Data_Namespace::CPU_LEVEL, shared::TableKey::db_id, executor_, get_column_descriptor(), get_column_type(), Chunk_NS::Chunk::getChunk(), Data_Namespace::GPU_LEVEL, kENCODING_NONE, shared::TableKey::table_id, and varlen_chunk_fetch_mutex_.

Referenced by Executor::fetchChunks(), Executor::fetchUnionChunks(), and getAllTableColumnFragments().

219  {
220  const auto fragments_it = all_tables_fragments.find(table_key);
221  CHECK(fragments_it != all_tables_fragments.end());
222  const auto fragments = fragments_it->second;
223  const auto& fragment = (*fragments)[frag_id];
224  if (fragment.isEmptyPhysicalFragment()) {
225  return nullptr;
226  }
227  std::shared_ptr<Chunk_NS::Chunk> chunk;
228  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
229  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
230  CHECK(table_key.table_id > 0);
231  const auto cd = get_column_descriptor({table_key, col_id});
232  CHECK(cd);
233  const auto col_type =
234  get_column_type(col_id, table_key.table_id, cd, executor_->temporary_tables_);
235  const bool is_real_string =
236  col_type.is_string() && col_type.get_compression() == kENCODING_NONE;
237  const bool is_varlen =
238  is_real_string ||
239  col_type.is_array(); // TODO: should it be col_type.is_varlen_array() ?
240  {
241  ChunkKey chunk_key{
242  table_key.db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
243  std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
244  if (is_varlen) {
245  varlen_chunk_lock.reset(new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
246  }
248  cd,
249  executor_->getDataMgr(),
250  chunk_key,
251  memory_level,
252  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id,
253  chunk_meta_it->second->numBytes,
254  chunk_meta_it->second->numElements);
255  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
256  chunk_holder.push_back(chunk);
257  }
258  if (is_varlen) {
259  CHECK_GT(table_key.table_id, 0);
260  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
261  chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
262  auto& chunk_iter = chunk_iter_holder.back();
263  if (memory_level == Data_Namespace::CPU_LEVEL) {
264  return reinterpret_cast<int8_t*>(&chunk_iter);
265  } else {
266  auto ab = chunk->getBuffer();
267  ab->pin();
268  auto& row_set_mem_owner = executor_->getRowSetMemoryOwner();
269  row_set_mem_owner->addVarlenInputBuffer(ab);
270  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
271  CHECK(allocator);
272  auto chunk_iter_gpu = allocator->alloc(sizeof(ChunkIter));
273  allocator->copyToDevice(
274  chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter), sizeof(ChunkIter));
275  return chunk_iter_gpu;
276  }
277  } else {
278  auto ab = chunk->getBuffer();
279  CHECK(ab->getMemoryPtr());
280  return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
281  }
282 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
std::mutex varlen_chunk_fetch_mutex_
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:233
Executor * executor_
#define CHECK_GT(x, y)
Definition: Logger.h:305
const ColumnDescriptor * get_column_descriptor(const shared::ColumnKey &column_key)
Definition: Execute.h:192
std::mutex chunk_list_mutex_
#define CHECK(condition)
Definition: Logger.h:291
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getResultSetColumn ( const InputColDescriptor col_desc,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const

Definition at line 350 of file ColumnFetcher.cpp.

References CHECK, executor_, get_temporary_table(), InputColDescriptor::getColId(), InputColDescriptor::getScanDesc(), and InputDescriptor::getTableKey().

Referenced by Executor::fetchChunks(), and Executor::fetchUnionChunks().

355  {
356  CHECK(col_desc);
357  const auto table_key = col_desc->getScanDesc().getTableKey();
358  return getResultSetColumn(
359  get_temporary_table(executor_->temporary_tables_, table_key.table_id),
360  table_key,
361  col_desc->getColId(),
362  memory_level,
363  device_id,
364  device_allocator,
365  thread_idx);
366 }
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
Executor * executor_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:225
int getColId() const
const shared::TableKey & getTableKey() const
#define CHECK(condition)
Definition: Logger.h:291
const InputDescriptor & getScanDesc() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getResultSetColumn ( const ResultSetPtr buffer,
const shared::TableKey table_key,
const int  col_id,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const
private

Definition at line 1107 of file ColumnFetcher.cpp.

References CHECK_GE, CHECK_NE, columnar_fetch_mutex_, anonymous_namespace{ColumnFetcher.cpp}::columnarize_result(), columnarized_table_cache_, executor_, run_benchmark_import::result, and transferColumnIfNeeded().

1114  {
1115  const ColumnarResults* result{nullptr};
1116  {
1117  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
1118  if (columnarized_table_cache_.empty() ||
1119  !columnarized_table_cache_.count(table_key)) {
1120  columnarized_table_cache_.insert(std::make_pair(
1121  table_key, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
1122  }
1123  auto& frag_id_to_result = columnarized_table_cache_[table_key];
1124  int frag_id = 0;
1125  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
1126  frag_id_to_result.insert(
1127  std::make_pair(frag_id,
1128  std::shared_ptr<const ColumnarResults>(
1129  columnarize_result(executor_->row_set_mem_owner_,
1130  buffer,
1131  executor_->executor_id_,
1132  thread_idx,
1133  frag_id))));
1134  }
1135  CHECK_NE(size_t(0), columnarized_table_cache_.count(table_key));
1136  result = columnarized_table_cache_[table_key][frag_id].get();
1137  }
1138  CHECK_GE(col_id, 0);
1139  return transferColumnIfNeeded(
1140  result, col_id, executor_->getDataMgr(), memory_level, device_id, device_allocator);
1141 }
std::mutex columnar_fetch_mutex_
#define CHECK_GE(x, y)
Definition: Logger.h:306
ColumnCacheMap columnarized_table_cache_
Executor * executor_
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const size_t executor_id, const int frag_id)
#define CHECK_NE(x, y)
Definition: Logger.h:302
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)

+ Here is the call graph for this function:

const int8_t * ColumnFetcher::linearizeColumnFragments ( const shared::TableKey table_key,
const int  col_id,
const std::map< shared::TableKey, const TableFragments * > &  all_tables_fragments,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const

Definition at line 368 of file ColumnFetcher.cpp.

References addMergedChunkIter(), Allocator::alloc(), CHECK, CHECK_EQ, CHECK_GT, chunk_list_mutex_, DeviceAllocator::copyToDevice(), Data_Namespace::CPU_LEVEL, shared::TableKey::db_id, DEBUG_TIMER, executor_, get_column_descriptor(), Chunk_NS::Chunk::getChunk(), getChunkiter(), anonymous_namespace{ColumnFetcher.cpp}::getMemoryLevelString(), InputColDescriptor::getScanDesc(), InputDescriptor::getSourceType(), Data_Namespace::GPU_LEVEL, linearization_mutex_, linearized_col_cache_mutex_, linearized_multi_frag_chunk_iter_cache_, linearizeFixedLenArrayColFrags(), linearizeVarLenArrayColFrags(), prepareChunkIter(), run_benchmark_import::res, TABLE, shared::TableKey::table_id, varlen_chunk_fetch_mutex_, and VLOG.

Referenced by Executor::fetchChunks().

377  {
378  auto timer = DEBUG_TIMER(__func__);
379  const auto fragments_it = all_tables_fragments.find(table_key);
380  CHECK(fragments_it != all_tables_fragments.end());
381  const auto fragments = fragments_it->second;
382  const auto frag_count = fragments->size();
383  const InputColDescriptor col_desc(col_id, table_key.table_id, table_key.db_id, int(0));
384  const auto cd = get_column_descriptor({table_key, col_id});
385  CHECK(cd);
386  CHECK(col_desc.getScanDesc().getSourceType() == InputSourceType::TABLE);
387  CHECK_GT(table_key.table_id, 0);
388  bool is_varlen_chunk = cd->columnType.is_varlen() && !cd->columnType.is_fixlen_array();
389  size_t total_num_tuples = 0;
390  size_t total_data_buf_size = 0;
391  size_t total_idx_buf_size = 0;
392  {
393  std::lock_guard<std::mutex> linearize_guard(linearized_col_cache_mutex_);
394  auto linearized_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
395  if (linearized_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
396  if (memory_level == CPU_LEVEL) {
397  // in CPU execution, each kernel can share merged chunk since they operates in the
398  // same memory space, so we can share the same chunk iter among kernels
399  return getChunkiter(col_desc, 0);
400  } else {
401  // in GPU execution, this becomes the matter when we deploy multi-GPUs
402  // so we only share the chunk_iter iff kernels are launched on the same GPU device
403  // otherwise we need to separately load merged chunk and its iter
404  // todo(yoonmin): D2D copy of merged chunk and its iter?
405  if (linearized_iter_it->second.find(device_id) !=
406  linearized_iter_it->second.end()) {
407  // note that cached chunk_iter is located on CPU memory space...
408  // we just need to copy it to each device
409  // chunk_iter already contains correct buffer addr depending on execution device
410  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
411  device_allocator->copyToDevice(
412  chunk_iter_gpu, getChunkiter(col_desc, device_id), sizeof(ChunkIter));
413  return chunk_iter_gpu;
414  }
415  }
416  }
417  }
418 
419  // collect target fragments
420  // basically we load chunk in CPU first, and do necessary manipulation
421  // to make semantics of a merged chunk correctly
422  std::shared_ptr<Chunk_NS::Chunk> chunk;
423  std::list<std::shared_ptr<Chunk_NS::Chunk>> local_chunk_holder;
424  std::list<ChunkIter> local_chunk_iter_holder;
425  std::list<size_t> local_chunk_num_tuples;
426  {
427  std::lock_guard<std::mutex> linearize_guard(varlen_chunk_fetch_mutex_);
428  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
429  const auto& fragment = (*fragments)[frag_id];
430  if (fragment.isEmptyPhysicalFragment()) {
431  continue;
432  }
433  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
434  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
435  ChunkKey chunk_key{
436  table_key.db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
437  chunk = Chunk_NS::Chunk::getChunk(cd,
438  executor_->getDataMgr(),
439  chunk_key,
441  0,
442  chunk_meta_it->second->numBytes,
443  chunk_meta_it->second->numElements);
444  local_chunk_holder.push_back(chunk);
445  auto chunk_iter = chunk->begin_iterator(chunk_meta_it->second);
446  local_chunk_iter_holder.push_back(chunk_iter);
447  local_chunk_num_tuples.push_back(fragment.getNumTuples());
448  total_num_tuples += fragment.getNumTuples();
449  total_data_buf_size += chunk->getBuffer()->size();
450  std::ostringstream oss;
451  oss << "Load chunk for col_name: " << chunk->getColumnDesc()->columnName
452  << ", col_id: " << chunk->getColumnDesc()->columnId << ", Frag-" << frag_id
453  << ", numTuples: " << fragment.getNumTuples()
454  << ", data_size: " << chunk->getBuffer()->size();
455  if (chunk->getIndexBuf()) {
456  auto idx_buf_size = chunk->getIndexBuf()->size() - sizeof(ArrayOffsetT);
457  oss << ", index_size: " << idx_buf_size;
458  total_idx_buf_size += idx_buf_size;
459  }
460  VLOG(2) << oss.str();
461  }
462  }
463 
464  auto& col_ti = cd->columnType;
465  MergedChunk res{nullptr, nullptr};
466  // Do linearize multi-fragmented column depending on column type
467  // We cover array and non-encoded text columns
468  // Note that geo column is actually organized as a set of arrays
469  // and each geo object has different set of vectors that they require
470  // Here, we linearize each array at a time, so eventually the geo object has a set of
471  // "linearized" arrays
472  {
473  std::lock_guard<std::mutex> linearization_guard(linearization_mutex_);
474  if (col_ti.is_array()) {
475  if (col_ti.is_fixlen_array()) {
476  VLOG(2) << "Linearize fixed-length multi-frag array column (col_id: "
477  << cd->columnId << ", col_name: " << cd->columnName
478  << ", device_type: " << getMemoryLevelString(memory_level)
479  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
481  chunk_holder,
482  chunk_iter_holder,
483  local_chunk_holder,
484  local_chunk_iter_holder,
485  local_chunk_num_tuples,
486  memory_level,
487  cd,
488  device_id,
489  total_data_buf_size,
490  total_idx_buf_size,
491  total_num_tuples,
492  device_allocator,
493  thread_idx);
494  } else {
495  CHECK(col_ti.is_varlen_array());
496  VLOG(2) << "Linearize variable-length multi-frag array column (col_id: "
497  << cd->columnId << ", col_name: " << cd->columnName
498  << ", device_type: " << getMemoryLevelString(memory_level)
499  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
501  chunk_holder,
502  chunk_iter_holder,
503  local_chunk_holder,
504  local_chunk_iter_holder,
505  local_chunk_num_tuples,
506  memory_level,
507  cd,
508  device_id,
509  total_data_buf_size,
510  total_idx_buf_size,
511  total_num_tuples,
512  device_allocator,
513  thread_idx);
514  }
515  }
516  if (col_ti.is_string() && !col_ti.is_dict_encoded_string()) {
517  VLOG(2) << "Linearize variable-length multi-frag non-encoded text column (col_id: "
518  << cd->columnId << ", col_name: " << cd->columnName
519  << ", device_type: " << getMemoryLevelString(memory_level)
520  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
522  chunk_holder,
523  chunk_iter_holder,
524  local_chunk_holder,
525  local_chunk_iter_holder,
526  local_chunk_num_tuples,
527  memory_level,
528  cd,
529  device_id,
530  total_data_buf_size,
531  total_idx_buf_size,
532  total_num_tuples,
533  device_allocator,
534  thread_idx);
535  }
536  }
537  CHECK(res.first); // check merged data buffer
538  if (!col_ti.is_fixlen_array()) {
539  CHECK(res.second); // check merged index buffer
540  }
541  auto merged_data_buffer = res.first;
542  auto merged_index_buffer = res.second;
543 
544  // prepare ChunkIter for the linearized chunk
545  auto merged_chunk = std::make_shared<Chunk_NS::Chunk>(
546  merged_data_buffer, merged_index_buffer, cd, false);
547  // to prepare chunk_iter for the merged chunk, we pass one of local chunk iter
548  // to fill necessary metadata that is a common for all merged chunks
549  auto merged_chunk_iter = prepareChunkIter(merged_data_buffer,
550  merged_index_buffer,
551  *(local_chunk_iter_holder.rbegin()),
552  is_varlen_chunk,
553  total_num_tuples);
554  {
555  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
556  chunk_holder.push_back(merged_chunk);
557  chunk_iter_holder.push_back(merged_chunk_iter);
558  }
559 
560  auto merged_chunk_iter_ptr = reinterpret_cast<int8_t*>(&(chunk_iter_holder.back()));
561  if (memory_level == MemoryLevel::CPU_LEVEL) {
562  addMergedChunkIter(col_desc, 0, merged_chunk_iter_ptr);
563  return merged_chunk_iter_ptr;
564  } else {
565  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
566  CHECK(device_allocator);
567  addMergedChunkIter(col_desc, device_id, merged_chunk_iter_ptr);
568  // note that merged_chunk_iter_ptr resides in CPU memory space
569  // having its content aware GPU buffer that we alloc. for merging
570  // so we need to copy this chunk_iter to each device explicitly
571  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
572  device_allocator->copyToDevice(
573  chunk_iter_gpu, merged_chunk_iter_ptr, sizeof(ChunkIter));
574  return chunk_iter_gpu;
575  }
576 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
std::mutex varlen_chunk_fetch_mutex_
virtual int8_t * alloc(const size_t num_bytes)=0
const int8_t * getChunkiter(const InputColDescriptor col_desc, const int device_id=0) const
Executor * executor_
std::mutex linearization_mutex_
#define CHECK_GT(x, y)
Definition: Logger.h:305
void addMergedChunkIter(const InputColDescriptor col_desc, const int device_id, int8_t *chunk_iter_ptr) const
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
const ColumnDescriptor * get_column_descriptor(const shared::ColumnKey &column_key)
Definition: Execute.h:192
std::mutex linearized_col_cache_mutex_
MergedChunk linearizeFixedLenArrayColFrags(int32_t db_id, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::mutex chunk_list_mutex_
MergedChunk linearizeVarLenArrayColFrags(int32_t db_id, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
int32_t ArrayOffsetT
Definition: sqltypes.h:1259
std::pair< AbstractBuffer *, AbstractBuffer * > MergedChunk
Definition: ColumnFetcher.h:47
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
ChunkIter prepareChunkIter(AbstractBuffer *merged_data_buf, AbstractBuffer *merged_index_buf, ChunkIter &chunk_iter, bool is_true_varlen_type, const size_t total_num_tuples) const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
#define VLOG(n)
Definition: Logger.h:387

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

MergedChunk ColumnFetcher::linearizeFixedLenArrayColFrags ( int32_t  db_id,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  local_chunk_holder,
std::list< ChunkIter > &  local_chunk_iter_holder,
std::list< size_t > &  local_chunk_num_tuples,
MemoryLevel  memory_level,
const ColumnDescriptor cd,
const int  device_id,
const size_t  total_data_buf_size,
const size_t  total_idx_buf_size,
const size_t  total_num_tuples,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const
private

Definition at line 903 of file ColumnFetcher.cpp.

References Data_Namespace::AbstractBuffer::append(), CHECK_EQ, check_interrupt(), ColumnDescriptor::columnId, Data_Namespace::CPU_LEVEL, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, anonymous_namespace{ColumnFetcher.cpp}::getMemoryLevelString(), linearized_col_cache_mutex_, linearized_data_buf_cache_, ColumnDescriptor::tableId, timer_start(), timer_stop(), and VLOG.

Referenced by linearizeColumnFragments().

917  {
918  int64_t linearization_time_ms = 0;
919  auto clock_begin = timer_start();
920  // linearize collected fragments
921  AbstractBuffer* merged_data_buffer = nullptr;
922  bool has_cached_merged_data_buf = false;
923  const InputColDescriptor icd(cd->columnId, cd->tableId, db_id, int(0));
924  {
925  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
926  auto cached_data_buf_cache_it = linearized_data_buf_cache_.find(icd);
927  if (cached_data_buf_cache_it != linearized_data_buf_cache_.end()) {
928  auto& cd_cache = cached_data_buf_cache_it->second;
929  auto cached_data_buf_it = cd_cache.find(device_id);
930  if (cached_data_buf_it != cd_cache.end()) {
931  has_cached_merged_data_buf = true;
932  merged_data_buffer = cached_data_buf_it->second;
933  VLOG(2) << "Recycle merged data buffer for linearized chunks (memory_level: "
934  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
935  << ")";
936  } else {
937  merged_data_buffer =
938  executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
939  VLOG(2) << "Allocate " << total_data_buf_size
940  << " bytes of data buffer space for linearized chunks (memory_level: "
941  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
942  << ")";
943  cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
944  }
945  } else {
947  merged_data_buffer =
948  executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
949  VLOG(2) << "Allocate " << total_data_buf_size
950  << " bytes of data buffer space for linearized chunks (memory_level: "
951  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
952  << ")";
953  m.insert(std::make_pair(device_id, merged_data_buffer));
954  linearized_data_buf_cache_.insert(std::make_pair(icd, m));
955  }
956  }
957  if (!has_cached_merged_data_buf) {
958  size_t sum_data_buf_size = 0;
959  auto chunk_holder_it = local_chunk_holder.begin();
960  auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
961  for (; chunk_holder_it != local_chunk_holder.end();
962  chunk_holder_it++, chunk_iter_holder_it++) {
965  }
966  auto target_chunk = chunk_holder_it->get();
967  auto target_chunk_data_buffer = target_chunk->getBuffer();
968  merged_data_buffer->append(target_chunk_data_buffer->getMemoryPtr(),
969  target_chunk_data_buffer->size(),
971  device_id);
972  sum_data_buf_size += target_chunk_data_buffer->size();
973  }
974  // check whether each chunk's data buffer is clean under chunk merging
975  CHECK_EQ(total_data_buf_size, sum_data_buf_size);
976  }
977  linearization_time_ms += timer_stop(clock_begin);
978  VLOG(2) << "Linearization has been successfully done, elapsed time: "
979  << linearization_time_ms << " ms.";
980  return {merged_data_buffer, nullptr};
981 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::unordered_map< int, AbstractBuffer * > DeviceMergedChunkMap
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1436
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Executor * executor_
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
__device__ bool check_interrupt()
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
std::mutex linearized_col_cache_mutex_
An AbstractBuffer is a unit of data management for a data manager.
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
#define VLOG(n)
Definition: Logger.h:387
Type timer_start()
Definition: measure.h:42

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

MergedChunk ColumnFetcher::linearizeVarLenArrayColFrags ( int32_t  db_id,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  local_chunk_holder,
std::list< ChunkIter > &  local_chunk_iter_holder,
std::list< size_t > &  local_chunk_num_tuples,
MemoryLevel  memory_level,
const ColumnDescriptor cd,
const int  device_id,
const size_t  total_data_buf_size,
const size_t  total_idx_buf_size,
const size_t  total_num_tuples,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const
private

Definition at line 578 of file ColumnFetcher.cpp.

References Data_Namespace::AbstractBuffer::append(), threading_serial::async(), CHECK, ColumnDescriptor::columnId, gpu_enabled::copy(), DeviceAllocator::copyToDevice(), Data_Namespace::CPU_LEVEL, cpu_threads(), ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE, run_benchmark_import::dest, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, g_enable_parallel_linearization, anonymous_namespace{ColumnFetcher.cpp}::getMemoryLevelString(), Data_Namespace::AbstractBuffer::getMemoryPtr(), Data_Namespace::GPU_LEVEL, LIKELY, linearized_col_cache_mutex_, linearized_data_buf_cache_, linearized_idx_buf_cache_, linearlized_temporary_cpu_index_buf_cache_, makeIntervals(), gpu_enabled::sort(), ColumnDescriptor::tableId, timer_start(), timer_stop(), and VLOG.

Referenced by linearizeColumnFragments().

592  {
593  // for linearization of varlen col we have to deal with not only data buffer
594  // but also its underlying index buffer which is responsible for offset of varlen value
595  // basically we maintain per-device linearized (data/index) buffer
596  // for data buffer, we linearize varlen col's chunks within a device-specific buffer
597  // by just appending each chunk
598  // for index buffer, we need to not only appending each chunk but modify the offset
599  // value to affect various conditions like nullness, padding and so on so we first
600  // append index buffer in CPU, manipulate it as we required and then copy it to specific
601  // device if necessary (for GPU execution)
602  AbstractBuffer* merged_index_buffer_in_cpu = nullptr;
603  AbstractBuffer* merged_data_buffer = nullptr;
604  bool has_cached_merged_idx_buf = false;
605  bool has_cached_merged_data_buf = false;
606  const InputColDescriptor icd(cd->columnId, cd->tableId, db_id, int(0));
607  // check linearized buffer's cache first
608  // if not exists, alloc necessary buffer space to prepare linearization
609  int64_t linearization_time_ms = 0;
610  auto clock_begin = timer_start();
611  {
612  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
613  auto cached_data_buf_cache_it = linearized_data_buf_cache_.find(icd);
614  if (cached_data_buf_cache_it != linearized_data_buf_cache_.end()) {
615  auto& cd_cache = cached_data_buf_cache_it->second;
616  auto cached_data_buf_it = cd_cache.find(device_id);
617  if (cached_data_buf_it != cd_cache.end()) {
618  has_cached_merged_data_buf = true;
619  merged_data_buffer = cached_data_buf_it->second;
620  VLOG(2) << "Recycle merged data buffer for linearized chunks (memory_level: "
621  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
622  << ")";
623  } else {
624  merged_data_buffer =
625  executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
626  VLOG(2) << "Allocate " << total_data_buf_size
627  << " bytes of data buffer space for linearized chunks (memory_level: "
628  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
629  << ")";
630  cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
631  }
632  } else {
634  merged_data_buffer =
635  executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
636  VLOG(2) << "Allocate " << total_data_buf_size
637  << " bytes of data buffer space for linearized chunks (memory_level: "
638  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
639  << ")";
640  m.insert(std::make_pair(device_id, merged_data_buffer));
641  linearized_data_buf_cache_.insert(std::make_pair(icd, m));
642  }
643 
644  auto cached_index_buf_it =
646  if (cached_index_buf_it != linearlized_temporary_cpu_index_buf_cache_.end()) {
647  has_cached_merged_idx_buf = true;
648  merged_index_buffer_in_cpu = cached_index_buf_it->second;
649  VLOG(2)
650  << "Recycle merged temporary idx buffer for linearized chunks (memory_level: "
651  << getMemoryLevelString(memory_level) << ", device_id: " << device_id << ")";
652  } else {
653  auto idx_buf_size = total_idx_buf_size + sizeof(ArrayOffsetT);
654  merged_index_buffer_in_cpu =
655  executor_->getDataMgr()->alloc(Data_Namespace::CPU_LEVEL, 0, idx_buf_size);
656  VLOG(2) << "Allocate " << idx_buf_size
657  << " bytes of temporary idx buffer space on CPU for linearized chunks";
658  // just copy the buf addr since we access it via the pointer itself
660  std::make_pair(cd->columnId, merged_index_buffer_in_cpu));
661  }
662  }
663 
664  // linearize buffers if we don't have corresponding buf in cache
665  size_t sum_data_buf_size = 0;
666  size_t cur_sum_num_tuples = 0;
667  size_t total_idx_size_modifier = 0;
668  auto chunk_holder_it = local_chunk_holder.begin();
669  auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
670  auto chunk_num_tuple_it = local_chunk_num_tuples.begin();
671  bool null_padded_first_elem = false;
672  bool null_padded_last_val = false;
673  // before entering the actual linearization part, we first need to check
674  // the overflow case where the sum of index offset becomes larger than 2GB
675  // which currently incurs incorrect query result due to negative array offset
676  // note that we can separate this from the main linearization logic b/c
677  // we just need to see few last elems
678  // todo (yoonmin) : relax this to support larger chunk size (>2GB)
679  for (; chunk_holder_it != local_chunk_holder.end();
680  chunk_holder_it++, chunk_num_tuple_it++) {
681  // check the offset overflow based on the last "valid" offset for each chunk
682  auto target_chunk = chunk_holder_it->get();
683  auto target_chunk_data_buffer = target_chunk->getBuffer();
684  auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
685  auto target_idx_buf_ptr =
686  reinterpret_cast<ArrayOffsetT*>(target_chunk_idx_buffer->getMemoryPtr());
687  auto cur_chunk_num_tuples = *chunk_num_tuple_it;
688  ArrayOffsetT original_offset = -1;
689  size_t cur_idx = cur_chunk_num_tuples;
690  // find the valid (e.g., non-null) offset starting from the last elem
691  while (original_offset < 0) {
692  original_offset = target_idx_buf_ptr[--cur_idx];
693  }
694  ArrayOffsetT new_offset = original_offset + sum_data_buf_size;
695  if (new_offset < 0) {
696  throw std::runtime_error(
697  "Linearization of a variable-length column having chunk size larger than 2GB "
698  "not supported yet");
699  }
700  sum_data_buf_size += target_chunk_data_buffer->size();
701  }
702  chunk_holder_it = local_chunk_holder.begin();
703  chunk_num_tuple_it = local_chunk_num_tuples.begin();
704  sum_data_buf_size = 0;
705 
706  for (; chunk_holder_it != local_chunk_holder.end();
707  chunk_holder_it++, chunk_iter_holder_it++, chunk_num_tuple_it++) {
709  executor_->checkNonKernelTimeInterrupted()) {
711  }
712  auto target_chunk = chunk_holder_it->get();
713  auto target_chunk_data_buffer = target_chunk->getBuffer();
714  auto cur_chunk_num_tuples = *chunk_num_tuple_it;
715  auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
716  auto target_idx_buf_ptr =
717  reinterpret_cast<ArrayOffsetT*>(target_chunk_idx_buffer->getMemoryPtr());
718  auto idx_buf_size = target_chunk_idx_buffer->size() - sizeof(ArrayOffsetT);
719  auto target_data_buffer_start_ptr = target_chunk_data_buffer->getMemoryPtr();
720  auto target_data_buffer_size = target_chunk_data_buffer->size();
721 
722  // when linearizing idx buffers, we need to consider the following cases
723  // 1. the first idx val is padded (a. null / b. empty varlen arr / c. 1-byte size
724  // varlen arr, i.e., {1})
725  // 2. the last idx val is null
726  // 3. null value(s) is/are located in a middle of idx buf <-- we don't need to care
727  if (cur_sum_num_tuples > 0 && target_idx_buf_ptr[0] > 0) {
728  null_padded_first_elem = true;
729  target_data_buffer_start_ptr += ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
730  target_data_buffer_size -= ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
731  total_idx_size_modifier += ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
732  }
733  // we linearize data_buf in device-specific buffer
734  if (!has_cached_merged_data_buf) {
735  merged_data_buffer->append(target_data_buffer_start_ptr,
736  target_data_buffer_size,
738  device_id);
739  }
740 
741  if (!has_cached_merged_idx_buf) {
742  // linearize idx buf in CPU first
743  merged_index_buffer_in_cpu->append(target_chunk_idx_buffer->getMemoryPtr(),
744  idx_buf_size,
746  0); // merged_index_buffer_in_cpu resides in CPU
747  auto idx_buf_ptr =
748  reinterpret_cast<ArrayOffsetT*>(merged_index_buffer_in_cpu->getMemoryPtr());
749  // here, we do not need to manipulate the very first idx buf, just let it as is
750  // and modify otherwise (i.e., starting from second chunk idx buf)
751  if (cur_sum_num_tuples > 0) {
752  if (null_padded_last_val) {
753  // case 2. the previous chunk's last index val is null so we need to set this
754  // chunk's first val to be null
755  idx_buf_ptr[cur_sum_num_tuples] = -sum_data_buf_size;
756  }
757  const size_t worker_count = cpu_threads();
758  std::vector<std::future<void>> conversion_threads;
759  std::vector<std::vector<size_t>> null_padded_row_idx_vecs(worker_count,
760  std::vector<size_t>());
761  bool is_parallel_modification = false;
762  std::vector<size_t> null_padded_row_idx_vec;
763  const auto do_work = [&cur_sum_num_tuples,
764  &sum_data_buf_size,
765  &null_padded_first_elem,
766  &idx_buf_ptr](
767  const size_t start,
768  const size_t end,
769  const bool is_parallel_modification,
770  std::vector<size_t>* null_padded_row_idx_vec) {
771  for (size_t i = start; i < end; i++) {
772  if (LIKELY(idx_buf_ptr[cur_sum_num_tuples + i] >= 0)) {
773  if (null_padded_first_elem) {
774  // deal with null padded bytes
775  idx_buf_ptr[cur_sum_num_tuples + i] -=
777  }
778  idx_buf_ptr[cur_sum_num_tuples + i] += sum_data_buf_size;
779  } else {
780  // null padded row needs to reference the previous row idx so in
781  // multi-threaded index modification we may suffer from thread
782  // contention when thread-i needs to reference thread-j's row idx so we
783  // collect row idxs for null rows here and deal with them after this
784  // step
785  null_padded_row_idx_vec->push_back(cur_sum_num_tuples + i);
786  }
787  }
788  };
789  if (cur_chunk_num_tuples > g_enable_parallel_linearization) {
790  is_parallel_modification = true;
791  for (auto interval :
792  makeIntervals(size_t(0), cur_chunk_num_tuples, worker_count)) {
793  conversion_threads.push_back(
795  do_work,
796  interval.begin,
797  interval.end,
798  is_parallel_modification,
799  &null_padded_row_idx_vecs[interval.index]));
800  }
801  for (auto& child : conversion_threads) {
802  child.wait();
803  }
804  for (auto& v : null_padded_row_idx_vecs) {
805  std::copy(v.begin(), v.end(), std::back_inserter(null_padded_row_idx_vec));
806  }
807  } else {
808  do_work(size_t(0),
809  cur_chunk_num_tuples,
810  is_parallel_modification,
811  &null_padded_row_idx_vec);
812  }
813  if (!null_padded_row_idx_vec.empty()) {
814  // modify null padded row idxs by referencing the previous row
815  // here we sort row idxs to correctly propagate modified row idxs
816  std::sort(null_padded_row_idx_vec.begin(), null_padded_row_idx_vec.end());
817  for (auto& padded_null_row_idx : null_padded_row_idx_vec) {
818  if (idx_buf_ptr[padded_null_row_idx - 1] > 0) {
819  idx_buf_ptr[padded_null_row_idx] = -idx_buf_ptr[padded_null_row_idx - 1];
820  } else {
821  idx_buf_ptr[padded_null_row_idx] = idx_buf_ptr[padded_null_row_idx - 1];
822  }
823  }
824  }
825  }
826  }
827  cur_sum_num_tuples += cur_chunk_num_tuples;
828  sum_data_buf_size += target_chunk_data_buffer->size();
829  if (target_idx_buf_ptr[*chunk_num_tuple_it] < 0) {
830  null_padded_last_val = true;
831  } else {
832  null_padded_last_val = false;
833  }
834  if (null_padded_first_elem) {
835  sum_data_buf_size -= ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
836  null_padded_first_elem = false; // set for the next chunk
837  }
838  if (!has_cached_merged_idx_buf && cur_sum_num_tuples == total_num_tuples) {
839  auto merged_index_buffer_ptr =
840  reinterpret_cast<ArrayOffsetT*>(merged_index_buffer_in_cpu->getMemoryPtr());
841  merged_index_buffer_ptr[total_num_tuples] =
842  total_data_buf_size -
843  total_idx_size_modifier; // last index value is total data size;
844  }
845  }
846 
847  // put linearized index buffer to per-device cache
848  AbstractBuffer* merged_index_buffer = nullptr;
849  size_t buf_size = total_idx_buf_size + sizeof(ArrayOffsetT);
850  auto copyBuf =
851  [&device_allocator](
852  int8_t* src, int8_t* dest, size_t buf_size, MemoryLevel memory_level) {
853  if (memory_level == Data_Namespace::CPU_LEVEL) {
854  memcpy((void*)dest, src, buf_size);
855  } else {
856  CHECK(memory_level == Data_Namespace::GPU_LEVEL);
857  device_allocator->copyToDevice(dest, src, buf_size);
858  }
859  };
860  {
861  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
862  auto merged_idx_buf_cache_it = linearized_idx_buf_cache_.find(icd);
863  // for CPU execution, we can use `merged_index_buffer_in_cpu` as is
864  // but for GPU, we have to copy it to corresponding device
865  if (memory_level == MemoryLevel::GPU_LEVEL) {
866  if (merged_idx_buf_cache_it != linearized_idx_buf_cache_.end()) {
867  auto& merged_idx_buf_cache = merged_idx_buf_cache_it->second;
868  auto merged_idx_buf_it = merged_idx_buf_cache.find(device_id);
869  if (merged_idx_buf_it != merged_idx_buf_cache.end()) {
870  merged_index_buffer = merged_idx_buf_it->second;
871  } else {
872  merged_index_buffer =
873  executor_->getDataMgr()->alloc(memory_level, device_id, buf_size);
874  copyBuf(merged_index_buffer_in_cpu->getMemoryPtr(),
875  merged_index_buffer->getMemoryPtr(),
876  buf_size,
877  memory_level);
878  merged_idx_buf_cache.insert(std::make_pair(device_id, merged_index_buffer));
879  }
880  } else {
881  merged_index_buffer =
882  executor_->getDataMgr()->alloc(memory_level, device_id, buf_size);
883  copyBuf(merged_index_buffer_in_cpu->getMemoryPtr(),
884  merged_index_buffer->getMemoryPtr(),
885  buf_size,
886  memory_level);
888  m.insert(std::make_pair(device_id, merged_index_buffer));
889  linearized_idx_buf_cache_.insert(std::make_pair(icd, m));
890  }
891  } else {
892  // `linearlized_temporary_cpu_index_buf_cache_` has this buf
893  merged_index_buffer = merged_index_buffer_in_cpu;
894  }
895  }
896  CHECK(merged_index_buffer);
897  linearization_time_ms += timer_stop(clock_begin);
898  VLOG(2) << "Linearization has been successfully done, elapsed time: "
899  << linearization_time_ms << " ms.";
900  return {merged_data_buffer, merged_index_buffer};
901 }
std::unordered_map< int, AbstractBuffer * > DeviceMergedChunkMap
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1436
virtual int8_t * getMemoryPtr()=0
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Executor * executor_
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:122
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
std::unordered_map< int, AbstractBuffer * > linearlized_temporary_cpu_index_buf_cache_
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
future< Result > async(Fn &&fn, Args &&...args)
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define LIKELY(x)
Definition: likely.h:24
std::mutex linearized_col_cache_mutex_
An AbstractBuffer is a unit of data management for a data manager.
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
int32_t ArrayOffsetT
Definition: sqltypes.h:1259
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
#define CHECK(condition)
Definition: Logger.h:291
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_idx_buf_cache_
int cpu_threads()
Definition: thread_count.h:25
size_t g_enable_parallel_linearization
Definition: Execute.cpp:144
#define VLOG(n)
Definition: Logger.h:387
Type timer_start()
Definition: measure.h:42

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

JoinColumn ColumnFetcher::makeJoinColumn ( Executor executor,
const Analyzer::ColumnVar hash_col,
const std::vector< Fragmenter_Namespace::FragmentInfo > &  fragments,
const Data_Namespace::MemoryLevel  effective_mem_lvl,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner,
std::vector< std::shared_ptr< void >> &  malloc_owner,
ColumnCacheMap column_cache 
)
static

Creates a JoinColumn struct containing an array of JoinChunk structs.

makeJoinColumn() creates a JoinColumn struct containing a array of JoinChunk structs, col_chunks_buff, malloced in CPU memory. Although the col_chunks_buff array is in CPU memory here, each JoinChunk struct contains an int8_t* pointer from getOneColumnFragment(), col_buff, that can point to either CPU memory or GPU memory depending on the effective_mem_lvl parameter. See also the fetchJoinColumn() function where col_chunks_buff is copied into GPU memory if needed. The malloc_owner parameter will have the malloced array appended. The chunks_owner parameter will be appended with the chunks.

Definition at line 155 of file ColumnFetcher.cpp.

References CHECK, CHECK_GT, checked_malloc(), JoinChunk::col_buff, Data_Namespace::CPU_LEVEL, Executor::ERR_INTERRUPTED, g_enable_non_kernel_time_query_interrupt, SQLTypeInfo::get_size(), Analyzer::Expr::get_type_info(), getOneColumnFragment(), and JoinChunk::num_elems.

Referenced by HashJoin::fetchJoinColumn().

165  {
166  CHECK(!fragments.empty());
167 
168  size_t col_chunks_buff_sz = sizeof(struct JoinChunk) * fragments.size();
169  // TODO: needs an allocator owner
170  auto col_chunks_buff = reinterpret_cast<int8_t*>(
171  malloc_owner.emplace_back(checked_malloc(col_chunks_buff_sz), free).get());
172  auto join_chunk_array = reinterpret_cast<struct JoinChunk*>(col_chunks_buff);
173 
174  size_t num_elems = 0;
175  size_t num_chunks = 0;
176  for (auto& frag : fragments) {
178  executor->checkNonKernelTimeInterrupted()) {
180  }
181  auto [col_buff, elem_count] = getOneColumnFragment(
182  executor,
183  hash_col,
184  frag,
185  effective_mem_lvl,
186  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
187  device_allocator,
188  thread_idx,
189  chunks_owner,
190  column_cache);
191  if (col_buff != nullptr) {
192  num_elems += elem_count;
193  join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
194  } else {
195  continue;
196  }
197  ++num_chunks;
198  }
199 
200  int elem_sz = hash_col.get_type_info().get_size();
201  CHECK_GT(elem_sz, 0);
202 
203  return {col_chunks_buff,
204  col_chunks_buff_sz,
205  num_chunks,
206  num_elems,
207  static_cast<size_t>(elem_sz)};
208 }
HOST DEVICE int get_size() const
Definition: sqltypes.h:393
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1436
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
const int8_t * col_buff
#define CHECK_GT(x, y)
Definition: Logger.h:305
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:79
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
#define CHECK(condition)
Definition: Logger.h:291
size_t num_elems

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ChunkIter ColumnFetcher::prepareChunkIter ( AbstractBuffer merged_data_buf,
AbstractBuffer merged_index_buf,
ChunkIter chunk_iter,
bool  is_true_varlen_type,
const size_t  total_num_tuples 
) const
private

Definition at line 1049 of file ColumnFetcher.cpp.

References ChunkIter::current_pos, ChunkIter::end_pos, Data_Namespace::AbstractBuffer::getMemoryPtr(), ChunkIter::num_elems, ChunkIter::second_buf, Data_Namespace::AbstractBuffer::size(), ChunkIter::skip, ChunkIter::skip_size, ChunkIter::start_pos, and ChunkIter::type_info.

Referenced by linearizeColumnFragments().

1053  {
1054  ChunkIter merged_chunk_iter;
1055  if (is_true_varlen_type) {
1056  merged_chunk_iter.start_pos = merged_index_buf->getMemoryPtr();
1057  merged_chunk_iter.current_pos = merged_index_buf->getMemoryPtr();
1058  merged_chunk_iter.end_pos = merged_index_buf->getMemoryPtr() +
1059  merged_index_buf->size() - sizeof(StringOffsetT);
1060  merged_chunk_iter.second_buf = merged_data_buf->getMemoryPtr();
1061  } else {
1062  merged_chunk_iter.start_pos = merged_data_buf->getMemoryPtr();
1063  merged_chunk_iter.current_pos = merged_data_buf->getMemoryPtr();
1064  merged_chunk_iter.end_pos = merged_data_buf->getMemoryPtr() + merged_data_buf->size();
1065  merged_chunk_iter.second_buf = nullptr;
1066  }
1067  merged_chunk_iter.num_elems = total_num_tuples;
1068  merged_chunk_iter.skip = chunk_iter.skip;
1069  merged_chunk_iter.skip_size = chunk_iter.skip_size;
1070  merged_chunk_iter.type_info = chunk_iter.type_info;
1071  return merged_chunk_iter;
1072 }
int8_t * start_pos
Definition: ChunkIter.h:34
int8_t * current_pos
Definition: ChunkIter.h:33
SQLTypeInfo type_info
Definition: ChunkIter.h:31
virtual int8_t * getMemoryPtr()=0
int32_t StringOffsetT
Definition: sqltypes.h:1258
int8_t * end_pos
Definition: ChunkIter.h:35
size_t num_elems
Definition: ChunkIter.h:38
int skip_size
Definition: ChunkIter.h:37
int8_t * second_buf
Definition: ChunkIter.h:32
int skip
Definition: ChunkIter.h:36

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::transferColumnIfNeeded ( const ColumnarResults columnar_results,
const int  col_id,
Data_Namespace::DataMgr data_mgr,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator 
)
staticprivate

Definition at line 983 of file ColumnFetcher.cpp.

References Allocator::alloc(), CHECK, CHECK_LT, DeviceAllocator::copyToDevice(), FlatBufferManager::getBufferSize(), ColumnarResults::getColumnBuffers(), ColumnarResults::getColumnType(), Data_Namespace::GPU_LEVEL, FlatBufferManager::isFlatBuffer(), and ColumnarResults::size().

Referenced by getAllTableColumnFragments(), getOneColumnFragment(), and getResultSetColumn().

989  {
990  if (!columnar_results) {
991  return nullptr;
992  }
993  const auto& col_buffers = columnar_results->getColumnBuffers();
994  CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
995  if (memory_level == Data_Namespace::GPU_LEVEL) {
996  const auto& col_ti = columnar_results->getColumnType(col_id);
997  size_t num_bytes;
998  if (col_ti.is_array() && FlatBufferManager::isFlatBuffer(col_buffers[col_id])) {
999  num_bytes = FlatBufferManager::getBufferSize(col_buffers[col_id]);
1000  } else {
1001  num_bytes = columnar_results->size() * col_ti.get_size();
1002  }
1003  CHECK(device_allocator);
1004  auto gpu_col_buffer = device_allocator->alloc(num_bytes);
1005  device_allocator->copyToDevice(gpu_col_buffer, col_buffers[col_id], num_bytes);
1006  return gpu_col_buffer;
1007  }
1008  return col_buffers[col_id];
1009 }
virtual int8_t * alloc(const size_t num_bytes)=0
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
const size_t size() const
#define CHECK_LT(x, y)
Definition: Logger.h:303
#define CHECK(condition)
Definition: Logger.h:291
HOST static DEVICE bool isFlatBuffer(const void *buffer)
Definition: FlatBuffer.h:186
const std::vector< int8_t * > & getColumnBuffers() const
const SQLTypeInfo & getColumnType(const int col_id) const
static int64_t getBufferSize(const void *buffer)
Definition: FlatBuffer.h:204

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Friends And Related Function Documentation

friend class QueryCompilationDescriptor
friend

Definition at line 200 of file ColumnFetcher.h.

friend class TableFunctionExecutionContext
friend

Definition at line 201 of file ColumnFetcher.h.

Member Data Documentation

std::mutex ColumnFetcher::chunk_list_mutex_
mutableprivate

Definition at line 184 of file ColumnFetcher.h.

Referenced by getOneTableColumnFragment(), and linearizeColumnFragments().

std::mutex ColumnFetcher::columnar_fetch_mutex_
mutableprivate

Definition at line 181 of file ColumnFetcher.h.

Referenced by getAllTableColumnFragments(), and getResultSetColumn().

std::unordered_map<InputColDescriptor, std::unique_ptr<const ColumnarResults> > ColumnFetcher::columnarized_scan_table_cache_
mutableprivate

Definition at line 188 of file ColumnFetcher.h.

Referenced by getAllTableColumnFragments().

ColumnCacheMap ColumnFetcher::columnarized_table_cache_
mutableprivate
std::mutex ColumnFetcher::linearization_mutex_
mutableprivate

Definition at line 183 of file ColumnFetcher.h.

Referenced by linearizeColumnFragments().

std::mutex ColumnFetcher::linearized_col_cache_mutex_
mutableprivate
std::unordered_map<InputColDescriptor, DeviceMergedChunkMap> ColumnFetcher::linearized_data_buf_cache_
mutableprivate
std::unordered_map<InputColDescriptor, DeviceMergedChunkMap> ColumnFetcher::linearized_idx_buf_cache_
mutableprivate

Definition at line 198 of file ColumnFetcher.h.

Referenced by freeLinearizedBuf(), and linearizeVarLenArrayColFrags().

std::unordered_map<InputColDescriptor, DeviceMergedChunkIterMap> ColumnFetcher::linearized_multi_frag_chunk_iter_cache_
mutableprivate

Definition at line 192 of file ColumnFetcher.h.

Referenced by addMergedChunkIter(), getChunkiter(), and linearizeColumnFragments().

std::unordered_map<int, AbstractBuffer*> ColumnFetcher::linearlized_temporary_cpu_index_buf_cache_
mutableprivate
std::mutex ColumnFetcher::varlen_chunk_fetch_mutex_
mutableprivate

Definition at line 182 of file ColumnFetcher.h.

Referenced by getOneTableColumnFragment(), and linearizeColumnFragments().


The documentation for this class was generated from the following files: