OmniSciDB  d2f719934e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnFetcher Class Reference

#include <ColumnFetcher.h>

+ Collaboration diagram for ColumnFetcher:

Public Member Functions

 ColumnFetcher (Executor *executor, const ColumnCacheMap &column_cache)
 
const int8_t * getOneTableColumnFragment (const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
 
const int8_t * getAllTableColumnFragments (const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
const int8_t * getResultSetColumn (const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
const int8_t * linearizeColumnFragments (const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
void freeTemporaryCpuLinearizedIdxBuf ()
 
void freeLinearizedBuf ()
 

Static Public Member Functions

static std::pair< const int8_t
*, size_t > 
getOneColumnFragment (Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
 Gets one chunk's pointer and element count on either CPU or GPU. More...
 
static JoinColumn makeJoinColumn (Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
 Creates a JoinColumn struct containing an array of JoinChunk structs. More...
 

Private Types

using DeviceMergedChunkIterMap = std::unordered_map< int, int8_t * >
 
using DeviceMergedChunkMap = std::unordered_map< int, AbstractBuffer * >
 

Private Member Functions

MergedChunk linearizeVarLenArrayColFrags (const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
MergedChunk linearizeFixedLenArrayColFrags (const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
void addMergedChunkIter (const InputColDescriptor col_desc, const int device_id, int8_t *chunk_iter_ptr) const
 
const int8_t * getChunkiter (const InputColDescriptor col_desc, const int device_id=0) const
 
ChunkIter prepareChunkIter (AbstractBuffer *merged_data_buf, AbstractBuffer *merged_index_buf, ChunkIter &chunk_iter, bool is_true_varlen_type, const size_t total_num_tuples) const
 
const int8_t * getResultSetColumn (const ResultSetPtr &buffer, const int table_id, const int col_id, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 

Static Private Member Functions

static const int8_t * transferColumnIfNeeded (const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
 

Private Attributes

Executorexecutor_
 
std::mutex columnar_fetch_mutex_
 
std::mutex varlen_chunk_fetch_mutex_
 
std::mutex linearization_mutex_
 
std::mutex chunk_list_mutex_
 
std::mutex linearized_col_cache_mutex_
 
ColumnCacheMap columnarized_table_cache_
 
std::unordered_map
< InputColDescriptor,
std::unique_ptr< const
ColumnarResults > > 
columnarized_scan_table_cache_
 
std::unordered_map
< InputColDescriptor,
DeviceMergedChunkIterMap
linearized_multi_frag_chunk_iter_cache_
 
std::unordered_map< int,
AbstractBuffer * > 
linearlized_temporary_cpu_index_buf_cache_
 
std::unordered_map
< InputColDescriptor,
DeviceMergedChunkMap
linearized_data_buf_cache_
 
std::unordered_map
< InputColDescriptor,
DeviceMergedChunkMap
linearized_idx_buf_cache_
 

Friends

class QueryCompilationDescriptor
 
class TableFunctionExecutionContext
 

Detailed Description

Definition at line 49 of file ColumnFetcher.h.

Member Typedef Documentation

using ColumnFetcher::DeviceMergedChunkIterMap = std::unordered_map<int, int8_t*>
private

Definition at line 189 of file ColumnFetcher.h.

using ColumnFetcher::DeviceMergedChunkMap = std::unordered_map<int, AbstractBuffer*>
private

Definition at line 190 of file ColumnFetcher.h.

Constructor & Destructor Documentation

ColumnFetcher::ColumnFetcher ( Executor executor,
const ColumnCacheMap column_cache 
)

Definition at line 61 of file ColumnFetcher.cpp.

62  : executor_(executor), columnarized_table_cache_(column_cache) {}
ColumnCacheMap columnarized_table_cache_
Executor * executor_

Member Function Documentation

void ColumnFetcher::addMergedChunkIter ( const InputColDescriptor  col_desc,
const int  device_id,
int8_t *  chunk_iter_ptr 
) const
private

Definition at line 1007 of file ColumnFetcher.cpp.

References InputColDescriptor::getColId(), InputColDescriptor::getScanDesc(), InputDescriptor::getTableId(), linearized_col_cache_mutex_, linearized_multi_frag_chunk_iter_cache_, and VLOG.

Referenced by linearizeColumnFragments().

1009  {
1010  std::lock_guard<std::mutex> linearize_guard(linearized_col_cache_mutex_);
1011  auto chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
1012  if (chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
1013  auto iter_device_it = chunk_iter_it->second.find(device_id);
1014  if (iter_device_it == chunk_iter_it->second.end()) {
1015  VLOG(2) << "Additional merged chunk_iter for col_desc (tbl: "
1016  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
1017  << "), device_id: " << device_id;
1018  chunk_iter_it->second.emplace(device_id, chunk_iter_ptr);
1019  }
1020  } else {
1021  DeviceMergedChunkIterMap iter_m;
1022  iter_m.emplace(device_id, chunk_iter_ptr);
1023  VLOG(2) << "New merged chunk_iter for col_desc (tbl: "
1024  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
1025  << "), device_id: " << device_id;
1026  linearized_multi_frag_chunk_iter_cache_.emplace(col_desc, iter_m);
1027  }
1028 }
std::unordered_map< int, int8_t * > DeviceMergedChunkIterMap
int getColId() const
int getTableId() const
std::mutex linearized_col_cache_mutex_
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
const InputDescriptor & getScanDesc() const
#define VLOG(n)
Definition: Logger.h:305

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnFetcher::freeLinearizedBuf ( )

Definition at line 1070 of file ColumnFetcher.cpp.

References cat(), executor_, linearized_col_cache_mutex_, linearized_data_buf_cache_, and linearized_idx_buf_cache_.

Referenced by Executor::executeWorkUnitImpl().

1070  {
1071  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
1072  const auto& cat = *executor_->getCatalog();
1073  auto& data_mgr = cat.getDataMgr();
1074 
1075  if (!linearized_data_buf_cache_.empty()) {
1076  for (auto& kv : linearized_data_buf_cache_) {
1077  for (auto& kv2 : kv.second) {
1078  data_mgr.free(kv2.second);
1079  }
1080  }
1081  }
1082 
1083  if (!linearized_idx_buf_cache_.empty()) {
1084  for (auto& kv : linearized_idx_buf_cache_) {
1085  for (auto& kv2 : kv.second) {
1086  data_mgr.free(kv2.second);
1087  }
1088  }
1089  }
1090 }
std::string cat(Ts &&...args)
Executor * executor_
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
std::mutex linearized_col_cache_mutex_
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_idx_buf_cache_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnFetcher::freeTemporaryCpuLinearizedIdxBuf ( )

Definition at line 1092 of file ColumnFetcher.cpp.

References cat(), executor_, linearized_col_cache_mutex_, and linearlized_temporary_cpu_index_buf_cache_.

Referenced by Executor::executeWorkUnitImpl().

1092  {
1093  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
1094  const auto& cat = *executor_->getCatalog();
1095  auto& data_mgr = cat.getDataMgr();
1098  data_mgr.free(kv.second);
1099  }
1100  }
1101 }
std::string cat(Ts &&...args)
Executor * executor_
std::unordered_map< int, AbstractBuffer * > linearlized_temporary_cpu_index_buf_cache_
std::mutex linearized_col_cache_mutex_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getAllTableColumnFragments ( const int  table_id,
const int  col_id,
const std::map< int, const TableFragments * > &  all_tables_fragments,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const

Definition at line 285 of file ColumnFetcher.cpp.

References CHECK, columnar_fetch_mutex_, columnarized_scan_table_cache_, Data_Namespace::CPU_LEVEL, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, getOneTableColumnFragment(), InputColDescriptor::getScanDesc(), InputDescriptor::getSourceType(), ColumnarResults::mergeResults(), TABLE, and transferColumnIfNeeded().

Referenced by Executor::fetchChunks(), and Executor::fetchUnionChunks().

292  {
293  const auto fragments_it = all_tables_fragments.find(table_id);
294  CHECK(fragments_it != all_tables_fragments.end());
295  const auto fragments = fragments_it->second;
296  const auto frag_count = fragments->size();
297  std::vector<std::unique_ptr<ColumnarResults>> column_frags;
298  const ColumnarResults* table_column = nullptr;
299  const InputColDescriptor col_desc(col_id, table_id, int(0));
300  CHECK(col_desc.getScanDesc().getSourceType() == InputSourceType::TABLE);
301  {
302  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
303  auto column_it = columnarized_scan_table_cache_.find(col_desc);
304  if (column_it == columnarized_scan_table_cache_.end()) {
305  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
307  executor_->checkNonKernelTimeInterrupted()) {
309  }
310  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
311  std::list<ChunkIter> chunk_iter_holder;
312  const auto& fragment = (*fragments)[frag_id];
313  if (fragment.isEmptyPhysicalFragment()) {
314  continue;
315  }
316  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
317  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
318  auto col_buffer = getOneTableColumnFragment(table_id,
319  static_cast<int>(frag_id),
320  col_id,
321  all_tables_fragments,
322  chunk_holder,
323  chunk_iter_holder,
325  int(0),
326  device_allocator);
327  column_frags.push_back(
328  std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
329  col_buffer,
330  fragment.getNumTuples(),
331  chunk_meta_it->second->sqlType,
332  executor_->executor_id_,
333  thread_idx));
334  }
335  auto merged_results =
336  ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
337  table_column = merged_results.get();
338  columnarized_scan_table_cache_.emplace(col_desc, std::move(merged_results));
339  } else {
340  table_column = column_it->second.get();
341  }
342  }
343  return ColumnFetcher::transferColumnIfNeeded(table_column,
344  0,
345  executor_->getDataMgr(),
346  memory_level,
347  device_id,
348  device_allocator);
349 }
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1165
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::mutex columnar_fetch_mutex_
Executor * executor_
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:120
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
#define CHECK(condition)
Definition: Logger.h:211

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getChunkiter ( const InputColDescriptor  col_desc,
const int  device_id = 0 
) const
private

Definition at line 1030 of file ColumnFetcher.cpp.

References InputColDescriptor::getColId(), InputColDescriptor::getScanDesc(), InputDescriptor::getTableId(), linearized_multi_frag_chunk_iter_cache_, and VLOG.

Referenced by linearizeColumnFragments().

1031  {
1032  auto linearized_chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
1033  if (linearized_chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
1034  auto dev_iter_map_it = linearized_chunk_iter_it->second.find(device_id);
1035  if (dev_iter_map_it != linearized_chunk_iter_it->second.end()) {
1036  VLOG(2) << "Recycle merged chunk_iter for col_desc (tbl: "
1037  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
1038  << "), device_id: " << device_id;
1039  return dev_iter_map_it->second;
1040  }
1041  }
1042  return nullptr;
1043 }
int getColId() const
int getTableId() const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
const InputDescriptor & getScanDesc() const
#define VLOG(n)
Definition: Logger.h:305

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< const int8_t *, size_t > ColumnFetcher::getOneColumnFragment ( Executor executor,
const Analyzer::ColumnVar hash_col,
const Fragmenter_Namespace::FragmentInfo fragment,
const Data_Namespace::MemoryLevel  effective_mem_lvl,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner,
ColumnCacheMap column_cache 
)
static

Gets one chunk's pointer and element count on either CPU or GPU.

Gets a column fragment chunk on CPU or on GPU depending on the effective memory level parameter. For temporary tables, the chunk will be copied to the GPU if needed. Returns a buffer pointer and an element count.

Definition at line 67 of file ColumnFetcher.cpp.

References CHECK, anonymous_namespace{ColumnFetcher.cpp}::columnarize_result(), Data_Namespace::CPU_LEVEL, DEBUG_TIMER, Fragmenter_Namespace::FragmentInfo::fragmentId, get_column_descriptor_maybe(), Analyzer::ColumnVar::get_column_id(), Analyzer::ColumnVar::get_table_id(), get_temporary_table(), Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMap(), Fragmenter_Namespace::FragmentInfo::getNumTuples(), Fragmenter_Namespace::FragmentInfo::isEmptyPhysicalFragment(), Fragmenter_Namespace::FragmentInfo::physicalTableId, and transferColumnIfNeeded().

Referenced by RelAlgExecutor::createWindowFunctionContext(), TableFunctionExecutionContext::execute(), and makeJoinColumn().

76  {
77  static std::mutex columnar_conversion_mutex;
78  auto timer = DEBUG_TIMER(__func__);
79  if (fragment.isEmptyPhysicalFragment()) {
80  return {nullptr, 0};
81  }
82  const auto table_id = hash_col.get_table_id();
83  const auto& catalog = *executor->getCatalog();
84  const auto cd =
85  get_column_descriptor_maybe(hash_col.get_column_id(), table_id, catalog);
86  CHECK(!cd || !(cd->isVirtualCol));
87  const int8_t* col_buff = nullptr;
88  if (cd) { // real table
89  /* chunk_meta_it is used here to retrieve chunk numBytes and
90  numElements. Apparently, their values are often zeros. If we
91  knew how to predict the zero values, calling
92  getChunkMetadataMap could be avoided to skip
93  synthesize_metadata calls. */
94  auto chunk_meta_it = fragment.getChunkMetadataMap().find(hash_col.get_column_id());
95  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
96  ChunkKey chunk_key{catalog.getCurrentDB().dbId,
97  fragment.physicalTableId,
98  hash_col.get_column_id(),
99  fragment.fragmentId};
100  const auto chunk = Chunk_NS::Chunk::getChunk(
101  cd,
102  &catalog.getDataMgr(),
103  chunk_key,
104  effective_mem_lvl,
105  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
106  chunk_meta_it->second->numBytes,
107  chunk_meta_it->second->numElements);
108  chunks_owner.push_back(chunk);
109  CHECK(chunk);
110  auto ab = chunk->getBuffer();
111  CHECK(ab->getMemoryPtr());
112  col_buff = reinterpret_cast<int8_t*>(ab->getMemoryPtr());
113  } else { // temporary table
114  const ColumnarResults* col_frag{nullptr};
115  {
116  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
117  const auto frag_id = fragment.fragmentId;
118  if (column_cache.empty() || !column_cache.count(table_id)) {
119  column_cache.insert(std::make_pair(
120  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
121  }
122  auto& frag_id_to_result = column_cache[table_id];
123  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
124  frag_id_to_result.insert(
125  std::make_pair(frag_id,
126  std::shared_ptr<const ColumnarResults>(columnarize_result(
127  executor->row_set_mem_owner_,
128  get_temporary_table(executor->temporary_tables_, table_id),
129  executor->executor_id_,
130  thread_idx,
131  frag_id))));
132  }
133  col_frag = column_cache[table_id][frag_id].get();
134  }
135  col_buff = transferColumnIfNeeded(
136  col_frag,
137  hash_col.get_column_id(),
138  &catalog.getDataMgr(),
139  effective_mem_lvl,
140  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
141  device_allocator);
142  }
143  return {col_buff, fragment.getNumTuples()};
144 }
int get_table_id() const
Definition: Analyzer.h:193
std::vector< int > ChunkKey
Definition: types.h:37
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const size_t executor_id, const int frag_id)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:228
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:220
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:30
const ChunkMetadataMap & getChunkMetadataMap() const
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
int get_column_id() const
Definition: Analyzer.h:194

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getOneTableColumnFragment ( const int  table_id,
const int  frag_id,
const int  col_id,
const std::map< int, const TableFragments * > &  all_tables_fragments,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator 
) const

Definition at line 210 of file ColumnFetcher.cpp.

References Allocator::alloc(), cat(), CHECK, CHECK_EQ, CHECK_GT, chunk_list_mutex_, DeviceAllocator::copyToDevice(), Data_Namespace::CPU_LEVEL, executor_, get_column_descriptor(), get_column_type(), Chunk_NS::Chunk::getChunk(), Data_Namespace::GPU_LEVEL, kENCODING_NONE, and varlen_chunk_fetch_mutex_.

Referenced by Executor::fetchChunks(), Executor::fetchUnionChunks(), and getAllTableColumnFragments().

219  {
220  const auto fragments_it = all_tables_fragments.find(table_id);
221  CHECK(fragments_it != all_tables_fragments.end());
222  const auto fragments = fragments_it->second;
223  const auto& fragment = (*fragments)[frag_id];
224  if (fragment.isEmptyPhysicalFragment()) {
225  return nullptr;
226  }
227  std::shared_ptr<Chunk_NS::Chunk> chunk;
228  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
229  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
230  CHECK(table_id > 0);
231  const auto& cat = *executor_->getCatalog();
232  auto cd = get_column_descriptor(col_id, table_id, cat);
233  CHECK(cd);
234  const auto col_type =
235  get_column_type(col_id, table_id, cd, executor_->temporary_tables_);
236  const bool is_real_string =
237  col_type.is_string() && col_type.get_compression() == kENCODING_NONE;
238  const bool is_varlen =
239  is_real_string ||
240  col_type.is_array(); // TODO: should it be col_type.is_varlen_array() ?
241  {
242  ChunkKey chunk_key{
243  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
244  std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
245  if (is_varlen) {
246  varlen_chunk_lock.reset(new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
247  }
249  cd,
250  &cat.getDataMgr(),
251  chunk_key,
252  memory_level,
253  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id,
254  chunk_meta_it->second->numBytes,
255  chunk_meta_it->second->numElements);
256  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
257  chunk_holder.push_back(chunk);
258  }
259  if (is_varlen) {
260  CHECK_GT(table_id, 0);
261  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
262  chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
263  auto& chunk_iter = chunk_iter_holder.back();
264  if (memory_level == Data_Namespace::CPU_LEVEL) {
265  return reinterpret_cast<int8_t*>(&chunk_iter);
266  } else {
267  auto ab = chunk->getBuffer();
268  ab->pin();
269  auto& row_set_mem_owner = executor_->getRowSetMemoryOwner();
270  row_set_mem_owner->addVarlenInputBuffer(ab);
271  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
272  CHECK(allocator);
273  auto chunk_iter_gpu = allocator->alloc(sizeof(ChunkIter));
274  allocator->copyToDevice(
275  chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter), sizeof(ChunkIter));
276  return chunk_iter_gpu;
277  }
278  } else {
279  auto ab = chunk->getBuffer();
280  CHECK(ab->getMemoryPtr());
281  return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
282  }
283 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::vector< int > ChunkKey
Definition: types.h:37
std::string cat(Ts &&...args)
std::mutex varlen_chunk_fetch_mutex_
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:236
Executor * executor_
#define CHECK_GT(x, y)
Definition: Logger.h:223
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:30
std::mutex chunk_list_mutex_
#define CHECK(condition)
Definition: Logger.h:211
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:191

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getResultSetColumn ( const InputColDescriptor col_desc,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const

Definition at line 351 of file ColumnFetcher.cpp.

References CHECK, executor_, get_temporary_table(), InputColDescriptor::getColId(), InputColDescriptor::getScanDesc(), and InputDescriptor::getTableId().

Referenced by Executor::fetchChunks(), and Executor::fetchUnionChunks().

356  {
357  CHECK(col_desc);
358  const auto table_id = col_desc->getScanDesc().getTableId();
359  return getResultSetColumn(get_temporary_table(executor_->temporary_tables_, table_id),
360  table_id,
361  col_desc->getColId(),
362  memory_level,
363  device_id,
364  device_allocator,
365  thread_idx);
366 }
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
Executor * executor_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:228
int getColId() const
int getTableId() const
#define CHECK(condition)
Definition: Logger.h:211
const InputDescriptor & getScanDesc() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getResultSetColumn ( const ResultSetPtr buffer,
const int  table_id,
const int  col_id,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const
private

Definition at line 1103 of file ColumnFetcher.cpp.

References CHECK_GE, CHECK_NE, columnar_fetch_mutex_, anonymous_namespace{ColumnFetcher.cpp}::columnarize_result(), columnarized_table_cache_, executor_, run_benchmark_import::result, and transferColumnIfNeeded().

1110  {
1111  const ColumnarResults* result{nullptr};
1112  {
1113  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
1114  if (columnarized_table_cache_.empty() || !columnarized_table_cache_.count(table_id)) {
1115  columnarized_table_cache_.insert(std::make_pair(
1116  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
1117  }
1118  auto& frag_id_to_result = columnarized_table_cache_[table_id];
1119  int frag_id = 0;
1120  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
1121  frag_id_to_result.insert(
1122  std::make_pair(frag_id,
1123  std::shared_ptr<const ColumnarResults>(
1124  columnarize_result(executor_->row_set_mem_owner_,
1125  buffer,
1126  executor_->executor_id_,
1127  thread_idx,
1128  frag_id))));
1129  }
1130  CHECK_NE(size_t(0), columnarized_table_cache_.count(table_id));
1131  result = columnarized_table_cache_[table_id][frag_id].get();
1132  }
1133  CHECK_GE(col_id, 0);
1134  return transferColumnIfNeeded(
1135  result, col_id, executor_->getDataMgr(), memory_level, device_id, device_allocator);
1136 }
std::mutex columnar_fetch_mutex_
#define CHECK_GE(x, y)
Definition: Logger.h:224
ColumnCacheMap columnarized_table_cache_
Executor * executor_
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const size_t executor_id, const int frag_id)
#define CHECK_NE(x, y)
Definition: Logger.h:220
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)

+ Here is the call graph for this function:

const int8_t * ColumnFetcher::linearizeColumnFragments ( const int  table_id,
const int  col_id,
const std::map< int, const TableFragments * > &  all_tables_fragments,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const

Definition at line 368 of file ColumnFetcher.cpp.

References addMergedChunkIter(), Allocator::alloc(), cat(), CHECK, CHECK_EQ, CHECK_GT, chunk_list_mutex_, DeviceAllocator::copyToDevice(), Data_Namespace::CPU_LEVEL, DEBUG_TIMER, executor_, get_column_descriptor(), Chunk_NS::Chunk::getChunk(), getChunkiter(), anonymous_namespace{ColumnFetcher.cpp}::getMemoryLevelString(), InputColDescriptor::getScanDesc(), InputDescriptor::getSourceType(), Data_Namespace::GPU_LEVEL, linearization_mutex_, linearized_col_cache_mutex_, linearized_multi_frag_chunk_iter_cache_, linearizeFixedLenArrayColFrags(), linearizeVarLenArrayColFrags(), prepareChunkIter(), run_benchmark_import::res, TABLE, varlen_chunk_fetch_mutex_, and VLOG.

Referenced by Executor::fetchChunks().

377  {
378  auto timer = DEBUG_TIMER(__func__);
379  const auto fragments_it = all_tables_fragments.find(table_id);
380  CHECK(fragments_it != all_tables_fragments.end());
381  const auto fragments = fragments_it->second;
382  const auto frag_count = fragments->size();
383  const InputColDescriptor col_desc(col_id, table_id, int(0));
384  const auto& cat = *executor_->getCatalog();
385  auto cd = get_column_descriptor(col_id, table_id, cat);
386  CHECK(cd);
387  CHECK(col_desc.getScanDesc().getSourceType() == InputSourceType::TABLE);
388  CHECK_GT(table_id, 0);
389  bool is_varlen_chunk = cd->columnType.is_varlen() && !cd->columnType.is_fixlen_array();
390  size_t total_num_tuples = 0;
391  size_t total_data_buf_size = 0;
392  size_t total_idx_buf_size = 0;
393  {
394  std::lock_guard<std::mutex> linearize_guard(linearized_col_cache_mutex_);
395  auto linearized_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
396  if (linearized_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
397  if (memory_level == CPU_LEVEL) {
398  // in CPU execution, each kernel can share merged chunk since they operates in the
399  // same memory space, so we can share the same chunk iter among kernels
400  return getChunkiter(col_desc, 0);
401  } else {
402  // in GPU execution, this becomes the matter when we deploy multi-GPUs
403  // so we only share the chunk_iter iff kernels are launched on the same GPU device
404  // otherwise we need to separately load merged chunk and its iter
405  // todo(yoonmin): D2D copy of merged chunk and its iter?
406  if (linearized_iter_it->second.find(device_id) !=
407  linearized_iter_it->second.end()) {
408  // note that cached chunk_iter is located on CPU memory space...
409  // we just need to copy it to each device
410  // chunk_iter already contains correct buffer addr depending on execution device
411  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
412  device_allocator->copyToDevice(
413  chunk_iter_gpu, getChunkiter(col_desc, device_id), sizeof(ChunkIter));
414  return chunk_iter_gpu;
415  }
416  }
417  }
418  }
419 
420  // collect target fragments
421  // basically we load chunk in CPU first, and do necessary manipulation
422  // to make semantics of a merged chunk correctly
423  std::shared_ptr<Chunk_NS::Chunk> chunk;
424  std::list<std::shared_ptr<Chunk_NS::Chunk>> local_chunk_holder;
425  std::list<ChunkIter> local_chunk_iter_holder;
426  std::list<size_t> local_chunk_num_tuples;
427  {
428  std::lock_guard<std::mutex> linearize_guard(varlen_chunk_fetch_mutex_);
429  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
430  const auto& fragment = (*fragments)[frag_id];
431  if (fragment.isEmptyPhysicalFragment()) {
432  continue;
433  }
434  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
435  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
436  ChunkKey chunk_key{
437  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
438  chunk = Chunk_NS::Chunk::getChunk(cd,
439  &cat.getDataMgr(),
440  chunk_key,
442  0,
443  chunk_meta_it->second->numBytes,
444  chunk_meta_it->second->numElements);
445  local_chunk_holder.push_back(chunk);
446  auto chunk_iter = chunk->begin_iterator(chunk_meta_it->second);
447  local_chunk_iter_holder.push_back(chunk_iter);
448  local_chunk_num_tuples.push_back(fragment.getNumTuples());
449  total_num_tuples += fragment.getNumTuples();
450  total_data_buf_size += chunk->getBuffer()->size();
451  std::ostringstream oss;
452  oss << "Load chunk for col_name: " << chunk->getColumnDesc()->columnName
453  << ", col_id: " << chunk->getColumnDesc()->columnId << ", Frag-" << frag_id
454  << ", numTuples: " << fragment.getNumTuples()
455  << ", data_size: " << chunk->getBuffer()->size();
456  if (chunk->getIndexBuf()) {
457  auto idx_buf_size = chunk->getIndexBuf()->size() - sizeof(ArrayOffsetT);
458  oss << ", index_size: " << idx_buf_size;
459  total_idx_buf_size += idx_buf_size;
460  }
461  VLOG(2) << oss.str();
462  }
463  }
464 
465  auto& col_ti = cd->columnType;
466  MergedChunk res{nullptr, nullptr};
467  // Do linearize multi-fragmented column depending on column type
468  // We cover array and non-encoded text columns
469  // Note that geo column is actually organized as a set of arrays
470  // and each geo object has different set of vectors that they require
471  // Here, we linearize each array at a time, so eventually the geo object has a set of
472  // "linearized" arrays
473  {
474  std::lock_guard<std::mutex> linearization_guard(linearization_mutex_);
475  if (col_ti.is_array()) {
476  if (col_ti.is_fixlen_array()) {
477  VLOG(2) << "Linearize fixed-length multi-frag array column (col_id: "
478  << cd->columnId << ", col_name: " << cd->columnName
479  << ", device_type: " << getMemoryLevelString(memory_level)
480  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
482  chunk_holder,
483  chunk_iter_holder,
484  local_chunk_holder,
485  local_chunk_iter_holder,
486  local_chunk_num_tuples,
487  memory_level,
488  cd,
489  device_id,
490  total_data_buf_size,
491  total_idx_buf_size,
492  total_num_tuples,
493  device_allocator,
494  thread_idx);
495  } else {
496  CHECK(col_ti.is_varlen_array());
497  VLOG(2) << "Linearize variable-length multi-frag array column (col_id: "
498  << cd->columnId << ", col_name: " << cd->columnName
499  << ", device_type: " << getMemoryLevelString(memory_level)
500  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
502  chunk_holder,
503  chunk_iter_holder,
504  local_chunk_holder,
505  local_chunk_iter_holder,
506  local_chunk_num_tuples,
507  memory_level,
508  cd,
509  device_id,
510  total_data_buf_size,
511  total_idx_buf_size,
512  total_num_tuples,
513  device_allocator,
514  thread_idx);
515  }
516  }
517  if (col_ti.is_string() && !col_ti.is_dict_encoded_string()) {
518  VLOG(2) << "Linearize variable-length multi-frag non-encoded text column (col_id: "
519  << cd->columnId << ", col_name: " << cd->columnName
520  << ", device_type: " << getMemoryLevelString(memory_level)
521  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
523  chunk_holder,
524  chunk_iter_holder,
525  local_chunk_holder,
526  local_chunk_iter_holder,
527  local_chunk_num_tuples,
528  memory_level,
529  cd,
530  device_id,
531  total_data_buf_size,
532  total_idx_buf_size,
533  total_num_tuples,
534  device_allocator,
535  thread_idx);
536  }
537  }
538  CHECK(res.first); // check merged data buffer
539  if (!col_ti.is_fixlen_array()) {
540  CHECK(res.second); // check merged index buffer
541  }
542  auto merged_data_buffer = res.first;
543  auto merged_index_buffer = res.second;
544 
545  // prepare ChunkIter for the linearized chunk
546  auto merged_chunk =
547  std::make_shared<Chunk_NS::Chunk>(merged_data_buffer, merged_index_buffer, cd);
548  // to prepare chunk_iter for the merged chunk, we pass one of local chunk iter
549  // to fill necessary metadata that is a common for all merged chunks
550  auto merged_chunk_iter = prepareChunkIter(merged_data_buffer,
551  merged_index_buffer,
552  *(local_chunk_iter_holder.rbegin()),
553  is_varlen_chunk,
554  total_num_tuples);
555  {
556  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
557  chunk_holder.push_back(merged_chunk);
558  chunk_iter_holder.push_back(merged_chunk_iter);
559  }
560 
561  auto merged_chunk_iter_ptr = reinterpret_cast<int8_t*>(&(chunk_iter_holder.back()));
562  if (memory_level == MemoryLevel::CPU_LEVEL) {
563  addMergedChunkIter(col_desc, 0, merged_chunk_iter_ptr);
564  return merged_chunk_iter_ptr;
565  } else {
566  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
567  CHECK(device_allocator);
568  addMergedChunkIter(col_desc, device_id, merged_chunk_iter_ptr);
569  // note that merged_chunk_iter_ptr resides in CPU memory space
570  // having its content aware GPU buffer that we alloc. for merging
571  // so we need to copy this chunk_iter to each device explicitly
572  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
573  device_allocator->copyToDevice(
574  chunk_iter_gpu, merged_chunk_iter_ptr, sizeof(ChunkIter));
575  return chunk_iter_gpu;
576  }
577 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::vector< int > ChunkKey
Definition: types.h:37
std::string cat(Ts &&...args)
std::mutex varlen_chunk_fetch_mutex_
virtual int8_t * alloc(const size_t num_bytes)=0
const int8_t * getChunkiter(const InputColDescriptor col_desc, const int device_id=0) const
Executor * executor_
std::mutex linearization_mutex_
#define CHECK_GT(x, y)
Definition: Logger.h:223
void addMergedChunkIter(const InputColDescriptor col_desc, const int device_id, int8_t *chunk_iter_ptr) const
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
MergedChunk linearizeVarLenArrayColFrags(const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
MergedChunk linearizeFixedLenArrayColFrags(const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::mutex linearized_col_cache_mutex_
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:30
std::mutex chunk_list_mutex_
int32_t ArrayOffsetT
Definition: sqltypes.h:1091
std::pair< AbstractBuffer *, AbstractBuffer * > MergedChunk
Definition: ColumnFetcher.h:47
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
ChunkIter prepareChunkIter(AbstractBuffer *merged_data_buf, AbstractBuffer *merged_index_buf, ChunkIter &chunk_iter, bool is_true_varlen_type, const size_t total_num_tuples) const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
#define VLOG(n)
Definition: Logger.h:305
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:191

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

MergedChunk ColumnFetcher::linearizeFixedLenArrayColFrags ( const Catalog_Namespace::Catalog cat,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  local_chunk_holder,
std::list< ChunkIter > &  local_chunk_iter_holder,
std::list< size_t > &  local_chunk_num_tuples,
MemoryLevel  memory_level,
const ColumnDescriptor cd,
const int  device_id,
const size_t  total_data_buf_size,
const size_t  total_idx_buf_size,
const size_t  total_num_tuples,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const
private

Definition at line 904 of file ColumnFetcher.cpp.

References Data_Namespace::DataMgr::alloc(), Data_Namespace::AbstractBuffer::append(), CHECK_EQ, check_interrupt(), ColumnDescriptor::columnId, Data_Namespace::CPU_LEVEL, Executor::ERR_INTERRUPTED, g_enable_non_kernel_time_query_interrupt, Catalog_Namespace::Catalog::getDataMgr(), anonymous_namespace{ColumnFetcher.cpp}::getMemoryLevelString(), linearized_col_cache_mutex_, linearized_data_buf_cache_, ColumnDescriptor::tableId, timer_start(), timer_stop(), and VLOG.

Referenced by linearizeColumnFragments().

918  {
919  int64_t linearization_time_ms = 0;
920  auto clock_begin = timer_start();
921  // linearize collected fragments
922  AbstractBuffer* merged_data_buffer = nullptr;
923  bool has_cached_merged_data_buf = false;
924  const InputColDescriptor icd(cd->columnId, cd->tableId, int(0));
925  {
926  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
927  auto cached_data_buf_cache_it = linearized_data_buf_cache_.find(icd);
928  if (cached_data_buf_cache_it != linearized_data_buf_cache_.end()) {
929  auto& cd_cache = cached_data_buf_cache_it->second;
930  auto cached_data_buf_it = cd_cache.find(device_id);
931  if (cached_data_buf_it != cd_cache.end()) {
932  has_cached_merged_data_buf = true;
933  merged_data_buffer = cached_data_buf_it->second;
934  VLOG(2) << "Recycle merged data buffer for linearized chunks (memory_level: "
935  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
936  << ")";
937  } else {
938  merged_data_buffer =
939  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
940  VLOG(2) << "Allocate " << total_data_buf_size
941  << " bytes of data buffer space for linearized chunks (memory_level: "
942  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
943  << ")";
944  cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
945  }
946  } else {
948  merged_data_buffer =
949  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
950  VLOG(2) << "Allocate " << total_data_buf_size
951  << " bytes of data buffer space for linearized chunks (memory_level: "
952  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
953  << ")";
954  m.insert(std::make_pair(device_id, merged_data_buffer));
955  linearized_data_buf_cache_.insert(std::make_pair(icd, m));
956  }
957  }
958  if (!has_cached_merged_data_buf) {
959  size_t sum_data_buf_size = 0;
960  auto chunk_holder_it = local_chunk_holder.begin();
961  auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
962  for (; chunk_holder_it != local_chunk_holder.end();
963  chunk_holder_it++, chunk_iter_holder_it++) {
966  }
967  auto target_chunk = chunk_holder_it->get();
968  auto target_chunk_data_buffer = target_chunk->getBuffer();
969  merged_data_buffer->append(target_chunk_data_buffer->getMemoryPtr(),
970  target_chunk_data_buffer->size(),
972  device_id);
973  sum_data_buf_size += target_chunk_data_buffer->size();
974  }
975  // check whether each chunk's data buffer is clean under chunk merging
976  CHECK_EQ(total_data_buf_size, sum_data_buf_size);
977  }
978  linearization_time_ms += timer_stop(clock_begin);
979  VLOG(2) << "Linearization has been successfully done, elapsed time: "
980  << linearization_time_ms << " ms.";
981  return {merged_data_buffer, nullptr};
982 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::unordered_map< int, AbstractBuffer * > DeviceMergedChunkMap
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1165
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:226
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:120
__device__ bool check_interrupt()
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
std::mutex linearized_col_cache_mutex_
An AbstractBuffer is a unit of data management for a data manager.
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
#define VLOG(n)
Definition: Logger.h:305
Type timer_start()
Definition: measure.h:42
AbstractBuffer * alloc(const MemoryLevel memoryLevel, const int deviceId, const size_t numBytes)
Definition: DataMgr.cpp:517

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

MergedChunk ColumnFetcher::linearizeVarLenArrayColFrags ( const Catalog_Namespace::Catalog cat,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  local_chunk_holder,
std::list< ChunkIter > &  local_chunk_iter_holder,
std::list< size_t > &  local_chunk_num_tuples,
MemoryLevel  memory_level,
const ColumnDescriptor cd,
const int  device_id,
const size_t  total_data_buf_size,
const size_t  total_idx_buf_size,
const size_t  total_num_tuples,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const
private

Definition at line 579 of file ColumnFetcher.cpp.

References Data_Namespace::DataMgr::alloc(), Data_Namespace::AbstractBuffer::append(), threading_serial::async(), CHECK, ColumnDescriptor::columnId, gpu_enabled::copy(), DeviceAllocator::copyToDevice(), Data_Namespace::CPU_LEVEL, cpu_threads(), ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE, run_benchmark_import::dest, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, g_enable_parallel_linearization, Catalog_Namespace::Catalog::getDataMgr(), anonymous_namespace{ColumnFetcher.cpp}::getMemoryLevelString(), Data_Namespace::AbstractBuffer::getMemoryPtr(), Data_Namespace::GPU_LEVEL, i, LIKELY, linearized_col_cache_mutex_, linearized_data_buf_cache_, linearized_idx_buf_cache_, linearlized_temporary_cpu_index_buf_cache_, makeIntervals(), gpu_enabled::sort(), ColumnDescriptor::tableId, timer_start(), timer_stop(), and VLOG.

Referenced by linearizeColumnFragments().

593  {
594  // for linearization of varlen col we have to deal with not only data buffer
595  // but also its underlying index buffer which is responsible for offset of varlen value
596  // basically we maintain per-device linearized (data/index) buffer
597  // for data buffer, we linearize varlen col's chunks within a device-specific buffer
598  // by just appending each chunk
599  // for index buffer, we need to not only appending each chunk but modify the offset
600  // value to affect various conditions like nullness, padding and so on so we first
601  // append index buffer in CPU, manipulate it as we required and then copy it to specific
602  // device if necessary (for GPU execution)
603  AbstractBuffer* merged_index_buffer_in_cpu = nullptr;
604  AbstractBuffer* merged_data_buffer = nullptr;
605  bool has_cached_merged_idx_buf = false;
606  bool has_cached_merged_data_buf = false;
607  const InputColDescriptor icd(cd->columnId, cd->tableId, int(0));
608  // check linearized buffer's cache first
609  // if not exists, alloc necessary buffer space to prepare linearization
610  int64_t linearization_time_ms = 0;
611  auto clock_begin = timer_start();
612  {
613  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
614  auto cached_data_buf_cache_it = linearized_data_buf_cache_.find(icd);
615  if (cached_data_buf_cache_it != linearized_data_buf_cache_.end()) {
616  auto& cd_cache = cached_data_buf_cache_it->second;
617  auto cached_data_buf_it = cd_cache.find(device_id);
618  if (cached_data_buf_it != cd_cache.end()) {
619  has_cached_merged_data_buf = true;
620  merged_data_buffer = cached_data_buf_it->second;
621  VLOG(2) << "Recycle merged data buffer for linearized chunks (memory_level: "
622  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
623  << ")";
624  } else {
625  merged_data_buffer =
626  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
627  VLOG(2) << "Allocate " << total_data_buf_size
628  << " bytes of data buffer space for linearized chunks (memory_level: "
629  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
630  << ")";
631  cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
632  }
633  } else {
635  merged_data_buffer =
636  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
637  VLOG(2) << "Allocate " << total_data_buf_size
638  << " bytes of data buffer space for linearized chunks (memory_level: "
639  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
640  << ")";
641  m.insert(std::make_pair(device_id, merged_data_buffer));
642  linearized_data_buf_cache_.insert(std::make_pair(icd, m));
643  }
644 
645  auto cached_index_buf_it =
647  if (cached_index_buf_it != linearlized_temporary_cpu_index_buf_cache_.end()) {
648  has_cached_merged_idx_buf = true;
649  merged_index_buffer_in_cpu = cached_index_buf_it->second;
650  VLOG(2)
651  << "Recycle merged temporary idx buffer for linearized chunks (memory_level: "
652  << getMemoryLevelString(memory_level) << ", device_id: " << device_id << ")";
653  } else {
654  auto idx_buf_size = total_idx_buf_size + sizeof(ArrayOffsetT);
655  merged_index_buffer_in_cpu =
656  cat.getDataMgr().alloc(Data_Namespace::CPU_LEVEL, 0, idx_buf_size);
657  VLOG(2) << "Allocate " << idx_buf_size
658  << " bytes of temporary idx buffer space on CPU for linearized chunks";
659  // just copy the buf addr since we access it via the pointer itself
661  std::make_pair(cd->columnId, merged_index_buffer_in_cpu));
662  }
663  }
664 
665  // linearize buffers if we don't have corresponding buf in cache
666  size_t sum_data_buf_size = 0;
667  size_t sum_idx_buf_size = 0;
668  size_t cur_sum_num_tuples = 0;
669  size_t total_idx_size_modifier = 0;
670  auto chunk_holder_it = local_chunk_holder.begin();
671  auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
672  auto chunk_num_tuple_it = local_chunk_num_tuples.begin();
673  bool null_padded_first_elem = false;
674  bool null_padded_last_val = false;
675  // before entering the actual linearization part, we first need to check
676  // the overflow case where the sum of index offset becomes larger than 2GB
677  // which currently incurs incorrect query result due to negative array offset
678  // note that we can separate this from the main linearization logic b/c
679  // we just need to see few last elems
680  // todo (yoonmin) : relax this to support larger chunk size (>2GB)
681  for (; chunk_holder_it != local_chunk_holder.end();
682  chunk_holder_it++, chunk_num_tuple_it++) {
683  // check the offset overflow based on the last "valid" offset for each chunk
684  auto target_chunk = chunk_holder_it->get();
685  auto target_chunk_data_buffer = target_chunk->getBuffer();
686  auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
687  auto target_idx_buf_ptr =
688  reinterpret_cast<ArrayOffsetT*>(target_chunk_idx_buffer->getMemoryPtr());
689  auto cur_chunk_num_tuples = *chunk_num_tuple_it;
690  ArrayOffsetT original_offset = -1;
691  size_t cur_idx = cur_chunk_num_tuples;
692  // find the valid (e.g., non-null) offset starting from the last elem
693  while (original_offset < 0) {
694  original_offset = target_idx_buf_ptr[--cur_idx];
695  }
696  ArrayOffsetT new_offset = original_offset + sum_data_buf_size;
697  if (new_offset < 0) {
698  throw std::runtime_error(
699  "Linearization of a variable-length column having chunk size larger than 2GB "
700  "not supported yet");
701  }
702  sum_data_buf_size += target_chunk_data_buffer->size();
703  }
704  chunk_holder_it = local_chunk_holder.begin();
705  chunk_num_tuple_it = local_chunk_num_tuples.begin();
706  sum_data_buf_size = 0;
707 
708  for (; chunk_holder_it != local_chunk_holder.end();
709  chunk_holder_it++, chunk_iter_holder_it++, chunk_num_tuple_it++) {
711  executor_->checkNonKernelTimeInterrupted()) {
713  }
714  auto target_chunk = chunk_holder_it->get();
715  auto target_chunk_data_buffer = target_chunk->getBuffer();
716  auto cur_chunk_num_tuples = *chunk_num_tuple_it;
717  auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
718  auto target_idx_buf_ptr =
719  reinterpret_cast<ArrayOffsetT*>(target_chunk_idx_buffer->getMemoryPtr());
720  auto idx_buf_size = target_chunk_idx_buffer->size() - sizeof(ArrayOffsetT);
721  auto target_data_buffer_start_ptr = target_chunk_data_buffer->getMemoryPtr();
722  auto target_data_buffer_size = target_chunk_data_buffer->size();
723 
724  // when linearizing idx buffers, we need to consider the following cases
725  // 1. the first idx val is padded (a. null / b. empty varlen arr / c. 1-byte size
726  // varlen arr, i.e., {1})
727  // 2. the last idx val is null
728  // 3. null value(s) is/are located in a middle of idx buf <-- we don't need to care
729  if (cur_sum_num_tuples > 0 && target_idx_buf_ptr[0] > 0) {
730  null_padded_first_elem = true;
731  target_data_buffer_start_ptr += ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
732  target_data_buffer_size -= ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
733  total_idx_size_modifier += ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
734  }
735  // we linearize data_buf in device-specific buffer
736  if (!has_cached_merged_data_buf) {
737  merged_data_buffer->append(target_data_buffer_start_ptr,
738  target_data_buffer_size,
740  device_id);
741  }
742 
743  if (!has_cached_merged_idx_buf) {
744  // linearize idx buf in CPU first
745  merged_index_buffer_in_cpu->append(target_chunk_idx_buffer->getMemoryPtr(),
746  idx_buf_size,
748  0); // merged_index_buffer_in_cpu resides in CPU
749  auto idx_buf_ptr =
750  reinterpret_cast<ArrayOffsetT*>(merged_index_buffer_in_cpu->getMemoryPtr());
751  // here, we do not need to manipulate the very first idx buf, just let it as is
752  // and modify otherwise (i.e., starting from second chunk idx buf)
753  if (cur_sum_num_tuples > 0) {
754  if (null_padded_last_val) {
755  // case 2. the previous chunk's last index val is null so we need to set this
756  // chunk's first val to be null
757  idx_buf_ptr[cur_sum_num_tuples] = -sum_data_buf_size;
758  }
759  const size_t worker_count = cpu_threads();
760  std::vector<std::future<void>> conversion_threads;
761  std::vector<std::vector<size_t>> null_padded_row_idx_vecs(worker_count,
762  std::vector<size_t>());
763  bool is_parallel_modification = false;
764  std::vector<size_t> null_padded_row_idx_vec;
765  const auto do_work = [&cur_sum_num_tuples,
766  &sum_data_buf_size,
767  &null_padded_first_elem,
768  &idx_buf_ptr](
769  const size_t start,
770  const size_t end,
771  const bool is_parallel_modification,
772  std::vector<size_t>* null_padded_row_idx_vec) {
773  for (size_t i = start; i < end; i++) {
774  if (LIKELY(idx_buf_ptr[cur_sum_num_tuples + i] >= 0)) {
775  if (null_padded_first_elem) {
776  // deal with null padded bytes
777  idx_buf_ptr[cur_sum_num_tuples + i] -=
779  }
780  idx_buf_ptr[cur_sum_num_tuples + i] += sum_data_buf_size;
781  } else {
782  // null padded row needs to reference the previous row idx so in
783  // multi-threaded index modification we may suffer from thread
784  // contention when thread-i needs to reference thread-j's row idx so we
785  // collect row idxs for null rows here and deal with them after this
786  // step
787  null_padded_row_idx_vec->push_back(cur_sum_num_tuples + i);
788  }
789  }
790  };
791  if (cur_chunk_num_tuples > g_enable_parallel_linearization) {
792  is_parallel_modification = true;
793  for (auto interval :
794  makeIntervals(size_t(0), cur_chunk_num_tuples, worker_count)) {
795  conversion_threads.push_back(
797  do_work,
798  interval.begin,
799  interval.end,
800  is_parallel_modification,
801  &null_padded_row_idx_vecs[interval.index]));
802  }
803  for (auto& child : conversion_threads) {
804  child.wait();
805  }
806  for (auto& v : null_padded_row_idx_vecs) {
807  std::copy(v.begin(), v.end(), std::back_inserter(null_padded_row_idx_vec));
808  }
809  } else {
810  do_work(size_t(0),
811  cur_chunk_num_tuples,
812  is_parallel_modification,
813  &null_padded_row_idx_vec);
814  }
815  if (!null_padded_row_idx_vec.empty()) {
816  // modify null padded row idxs by referencing the previous row
817  // here we sort row idxs to correctly propagate modified row idxs
818  std::sort(null_padded_row_idx_vec.begin(), null_padded_row_idx_vec.end());
819  for (auto& padded_null_row_idx : null_padded_row_idx_vec) {
820  if (idx_buf_ptr[padded_null_row_idx - 1] > 0) {
821  idx_buf_ptr[padded_null_row_idx] = -idx_buf_ptr[padded_null_row_idx - 1];
822  } else {
823  idx_buf_ptr[padded_null_row_idx] = idx_buf_ptr[padded_null_row_idx - 1];
824  }
825  }
826  }
827  }
828  }
829  sum_idx_buf_size += idx_buf_size;
830  cur_sum_num_tuples += cur_chunk_num_tuples;
831  sum_data_buf_size += target_chunk_data_buffer->size();
832  if (target_idx_buf_ptr[*chunk_num_tuple_it] < 0) {
833  null_padded_last_val = true;
834  } else {
835  null_padded_last_val = false;
836  }
837  if (null_padded_first_elem) {
838  sum_data_buf_size -= ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
839  null_padded_first_elem = false; // set for the next chunk
840  }
841  if (!has_cached_merged_idx_buf && cur_sum_num_tuples == total_num_tuples) {
842  auto merged_index_buffer_ptr =
843  reinterpret_cast<ArrayOffsetT*>(merged_index_buffer_in_cpu->getMemoryPtr());
844  merged_index_buffer_ptr[total_num_tuples] =
845  total_data_buf_size -
846  total_idx_size_modifier; // last index value is total data size;
847  }
848  }
849 
850  // put linearized index buffer to per-device cache
851  AbstractBuffer* merged_index_buffer = nullptr;
852  size_t buf_size = total_idx_buf_size + sizeof(ArrayOffsetT);
853  auto copyBuf =
854  [&device_allocator](
855  int8_t* src, int8_t* dest, size_t buf_size, MemoryLevel memory_level) {
856  if (memory_level == Data_Namespace::CPU_LEVEL) {
857  memcpy((void*)dest, src, buf_size);
858  } else {
859  CHECK(memory_level == Data_Namespace::GPU_LEVEL);
860  device_allocator->copyToDevice(dest, src, buf_size);
861  }
862  };
863  {
864  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
865  auto merged_idx_buf_cache_it = linearized_idx_buf_cache_.find(icd);
866  // for CPU execution, we can use `merged_index_buffer_in_cpu` as is
867  // but for GPU, we have to copy it to corresponding device
868  if (memory_level == MemoryLevel::GPU_LEVEL) {
869  if (merged_idx_buf_cache_it != linearized_idx_buf_cache_.end()) {
870  auto& merged_idx_buf_cache = merged_idx_buf_cache_it->second;
871  auto merged_idx_buf_it = merged_idx_buf_cache.find(device_id);
872  if (merged_idx_buf_it != merged_idx_buf_cache.end()) {
873  merged_index_buffer = merged_idx_buf_it->second;
874  } else {
875  merged_index_buffer = cat.getDataMgr().alloc(memory_level, device_id, buf_size);
876  copyBuf(merged_index_buffer_in_cpu->getMemoryPtr(),
877  merged_index_buffer->getMemoryPtr(),
878  buf_size,
879  memory_level);
880  merged_idx_buf_cache.insert(std::make_pair(device_id, merged_index_buffer));
881  }
882  } else {
883  merged_index_buffer = cat.getDataMgr().alloc(memory_level, device_id, buf_size);
884  copyBuf(merged_index_buffer_in_cpu->getMemoryPtr(),
885  merged_index_buffer->getMemoryPtr(),
886  buf_size,
887  memory_level);
889  m.insert(std::make_pair(device_id, merged_index_buffer));
890  linearized_idx_buf_cache_.insert(std::make_pair(icd, m));
891  }
892  } else {
893  // `linearlized_temporary_cpu_index_buf_cache_` has this buf
894  merged_index_buffer = merged_index_buffer_in_cpu;
895  }
896  }
897  CHECK(merged_index_buffer);
898  linearization_time_ms += timer_stop(clock_begin);
899  VLOG(2) << "Linearization has been successfully done, elapsed time: "
900  << linearization_time_ms << " ms.";
901  return {merged_data_buffer, merged_index_buffer};
902 }
std::unordered_map< int, AbstractBuffer * > DeviceMergedChunkMap
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1165
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:226
virtual int8_t * getMemoryPtr()=0
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Executor * executor_
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:116
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:120
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
std::unordered_map< int, AbstractBuffer * > linearlized_temporary_cpu_index_buf_cache_
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
future< Result > async(Fn &&fn, Args &&...args)
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define LIKELY(x)
Definition: likely.h:24
std::mutex linearized_col_cache_mutex_
An AbstractBuffer is a unit of data management for a data manager.
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
int32_t ArrayOffsetT
Definition: sqltypes.h:1091
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
#define CHECK(condition)
Definition: Logger.h:211
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_idx_buf_cache_
int cpu_threads()
Definition: thread_count.h:24
size_t g_enable_parallel_linearization
Definition: Execute.cpp:138
#define VLOG(n)
Definition: Logger.h:305
Type timer_start()
Definition: measure.h:42
AbstractBuffer * alloc(const MemoryLevel memoryLevel, const int deviceId, const size_t numBytes)
Definition: DataMgr.cpp:517

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

JoinColumn ColumnFetcher::makeJoinColumn ( Executor executor,
const Analyzer::ColumnVar hash_col,
const std::vector< Fragmenter_Namespace::FragmentInfo > &  fragments,
const Data_Namespace::MemoryLevel  effective_mem_lvl,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner,
std::vector< std::shared_ptr< void >> &  malloc_owner,
ColumnCacheMap column_cache 
)
static

Creates a JoinColumn struct containing an array of JoinChunk structs.

makeJoinColumn() creates a JoinColumn struct containing a array of JoinChunk structs, col_chunks_buff, malloced in CPU memory. Although the col_chunks_buff array is in CPU memory here, each JoinChunk struct contains an int8_t* pointer from getOneColumnFragment(), col_buff, that can point to either CPU memory or GPU memory depending on the effective_mem_lvl parameter. See also the fetchJoinColumn() function where col_chunks_buff is copied into GPU memory if needed. The malloc_owner parameter will have the malloced array appended. The chunks_owner parameter will be appended with the chunks.

Definition at line 155 of file ColumnFetcher.cpp.

References CHECK, CHECK_GT, checked_malloc(), JoinChunk::col_buff, Data_Namespace::CPU_LEVEL, Executor::ERR_INTERRUPTED, g_enable_non_kernel_time_query_interrupt, SQLTypeInfo::get_size(), Analyzer::Expr::get_type_info(), getOneColumnFragment(), and JoinChunk::num_elems.

Referenced by HashJoin::fetchJoinColumn().

165  {
166  CHECK(!fragments.empty());
167 
168  size_t col_chunks_buff_sz = sizeof(struct JoinChunk) * fragments.size();
169  // TODO: needs an allocator owner
170  auto col_chunks_buff = reinterpret_cast<int8_t*>(
171  malloc_owner.emplace_back(checked_malloc(col_chunks_buff_sz), free).get());
172  auto join_chunk_array = reinterpret_cast<struct JoinChunk*>(col_chunks_buff);
173 
174  size_t num_elems = 0;
175  size_t num_chunks = 0;
176  for (auto& frag : fragments) {
178  executor->checkNonKernelTimeInterrupted()) {
180  }
181  auto [col_buff, elem_count] = getOneColumnFragment(
182  executor,
183  hash_col,
184  frag,
185  effective_mem_lvl,
186  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
187  device_allocator,
188  thread_idx,
189  chunks_owner,
190  column_cache);
191  if (col_buff != nullptr) {
192  num_elems += elem_count;
193  join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
194  } else {
195  continue;
196  }
197  ++num_chunks;
198  }
199 
200  int elem_sz = hash_col.get_type_info().get_size();
201  CHECK_GT(elem_sz, 0);
202 
203  return {col_chunks_buff,
204  col_chunks_buff_sz,
205  num_chunks,
206  num_elems,
207  static_cast<size_t>(elem_sz)};
208 }
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1165
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:120
const int8_t * col_buff
#define CHECK_GT(x, y)
Definition: Logger.h:223
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:77
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
#define CHECK(condition)
Definition: Logger.h:211
size_t num_elems

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ChunkIter ColumnFetcher::prepareChunkIter ( AbstractBuffer merged_data_buf,
AbstractBuffer merged_index_buf,
ChunkIter chunk_iter,
bool  is_true_varlen_type,
const size_t  total_num_tuples 
) const
private

Definition at line 1045 of file ColumnFetcher.cpp.

References ChunkIter::current_pos, ChunkIter::end_pos, Data_Namespace::AbstractBuffer::getMemoryPtr(), ChunkIter::num_elems, ChunkIter::second_buf, Data_Namespace::AbstractBuffer::size(), ChunkIter::skip, ChunkIter::skip_size, ChunkIter::start_pos, and ChunkIter::type_info.

Referenced by linearizeColumnFragments().

1049  {
1050  ChunkIter merged_chunk_iter;
1051  if (is_true_varlen_type) {
1052  merged_chunk_iter.start_pos = merged_index_buf->getMemoryPtr();
1053  merged_chunk_iter.current_pos = merged_index_buf->getMemoryPtr();
1054  merged_chunk_iter.end_pos = merged_index_buf->getMemoryPtr() +
1055  merged_index_buf->size() - sizeof(StringOffsetT);
1056  merged_chunk_iter.second_buf = merged_data_buf->getMemoryPtr();
1057  } else {
1058  merged_chunk_iter.start_pos = merged_data_buf->getMemoryPtr();
1059  merged_chunk_iter.current_pos = merged_data_buf->getMemoryPtr();
1060  merged_chunk_iter.end_pos = merged_data_buf->getMemoryPtr() + merged_data_buf->size();
1061  merged_chunk_iter.second_buf = nullptr;
1062  }
1063  merged_chunk_iter.num_elems = total_num_tuples;
1064  merged_chunk_iter.skip = chunk_iter.skip;
1065  merged_chunk_iter.skip_size = chunk_iter.skip_size;
1066  merged_chunk_iter.type_info = chunk_iter.type_info;
1067  return merged_chunk_iter;
1068 }
int8_t * start_pos
Definition: ChunkIter.h:33
int8_t * current_pos
Definition: ChunkIter.h:32
SQLTypeInfo type_info
Definition: ChunkIter.h:30
virtual int8_t * getMemoryPtr()=0
int32_t StringOffsetT
Definition: sqltypes.h:1090
int8_t * end_pos
Definition: ChunkIter.h:34
size_t num_elems
Definition: ChunkIter.h:37
int skip_size
Definition: ChunkIter.h:36
int8_t * second_buf
Definition: ChunkIter.h:31
int skip
Definition: ChunkIter.h:35

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::transferColumnIfNeeded ( const ColumnarResults columnar_results,
const int  col_id,
Data_Namespace::DataMgr data_mgr,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator 
)
staticprivate

Definition at line 984 of file ColumnFetcher.cpp.

References Allocator::alloc(), CHECK, CHECK_LT, DeviceAllocator::copyToDevice(), ColumnarResults::getColumnBuffers(), ColumnarResults::getColumnType(), Data_Namespace::GPU_LEVEL, and ColumnarResults::size().

Referenced by getAllTableColumnFragments(), getOneColumnFragment(), and getResultSetColumn().

990  {
991  if (!columnar_results) {
992  return nullptr;
993  }
994  const auto& col_buffers = columnar_results->getColumnBuffers();
995  CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
996  if (memory_level == Data_Namespace::GPU_LEVEL) {
997  const auto& col_ti = columnar_results->getColumnType(col_id);
998  const auto num_bytes = columnar_results->size() * col_ti.get_size();
999  CHECK(device_allocator);
1000  auto gpu_col_buffer = device_allocator->alloc(num_bytes);
1001  device_allocator->copyToDevice(gpu_col_buffer, col_buffers[col_id], num_bytes);
1002  return gpu_col_buffer;
1003  }
1004  return col_buffers[col_id];
1005 }
virtual int8_t * alloc(const size_t num_bytes)=0
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
const size_t size() const
#define CHECK_LT(x, y)
Definition: Logger.h:221
#define CHECK(condition)
Definition: Logger.h:211
const std::vector< int8_t * > & getColumnBuffers() const
const SQLTypeInfo & getColumnType(const int col_id) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Friends And Related Function Documentation

friend class QueryCompilationDescriptor
friend

Definition at line 200 of file ColumnFetcher.h.

friend class TableFunctionExecutionContext
friend

Definition at line 201 of file ColumnFetcher.h.

Member Data Documentation

std::mutex ColumnFetcher::chunk_list_mutex_
mutableprivate

Definition at line 184 of file ColumnFetcher.h.

Referenced by getOneTableColumnFragment(), and linearizeColumnFragments().

std::mutex ColumnFetcher::columnar_fetch_mutex_
mutableprivate

Definition at line 181 of file ColumnFetcher.h.

Referenced by getAllTableColumnFragments(), and getResultSetColumn().

std::unordered_map<InputColDescriptor, std::unique_ptr<const ColumnarResults> > ColumnFetcher::columnarized_scan_table_cache_
mutableprivate

Definition at line 188 of file ColumnFetcher.h.

Referenced by getAllTableColumnFragments().

ColumnCacheMap ColumnFetcher::columnarized_table_cache_
mutableprivate
std::mutex ColumnFetcher::linearization_mutex_
mutableprivate

Definition at line 183 of file ColumnFetcher.h.

Referenced by linearizeColumnFragments().

std::mutex ColumnFetcher::linearized_col_cache_mutex_
mutableprivate
std::unordered_map<InputColDescriptor, DeviceMergedChunkMap> ColumnFetcher::linearized_data_buf_cache_
mutableprivate
std::unordered_map<InputColDescriptor, DeviceMergedChunkMap> ColumnFetcher::linearized_idx_buf_cache_
mutableprivate

Definition at line 198 of file ColumnFetcher.h.

Referenced by freeLinearizedBuf(), and linearizeVarLenArrayColFrags().

std::unordered_map<InputColDescriptor, DeviceMergedChunkIterMap> ColumnFetcher::linearized_multi_frag_chunk_iter_cache_
mutableprivate

Definition at line 192 of file ColumnFetcher.h.

Referenced by addMergedChunkIter(), getChunkiter(), and linearizeColumnFragments().

std::unordered_map<int, AbstractBuffer*> ColumnFetcher::linearlized_temporary_cpu_index_buf_cache_
mutableprivate
std::mutex ColumnFetcher::varlen_chunk_fetch_mutex_
mutableprivate

Definition at line 182 of file ColumnFetcher.h.

Referenced by getOneTableColumnFragment(), and linearizeColumnFragments().


The documentation for this class was generated from the following files: