OmniSciDB  b28c0d5765
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnFetcher Class Reference

#include <ColumnFetcher.h>

+ Collaboration diagram for ColumnFetcher:

Public Member Functions

 ColumnFetcher (Executor *executor, const ColumnCacheMap &column_cache)
 
const int8_t * getOneTableColumnFragment (const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
 
const int8_t * getAllTableColumnFragments (const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
const int8_t * getResultSetColumn (const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
const int8_t * linearizeColumnFragments (const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
void freeTemporaryCpuLinearizedIdxBuf ()
 
void freeLinearizedBuf ()
 

Static Public Member Functions

static std::pair< const int8_t
*, size_t > 
getOneColumnFragment (Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
 Gets one chunk's pointer and element count on either CPU or GPU. More...
 
static JoinColumn makeJoinColumn (Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
 Creates a JoinColumn struct containing an array of JoinChunk structs. More...
 

Private Types

using DeviceMergedChunkIterMap = std::unordered_map< int, int8_t * >
 
using DeviceMergedChunkMap = std::unordered_map< int, AbstractBuffer * >
 

Private Member Functions

MergedChunk linearizeVarLenArrayColFrags (const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
MergedChunk linearizeFixedLenArrayColFrags (const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
 
void addMergedChunkIter (const InputColDescriptor col_desc, const int device_id, int8_t *chunk_iter_ptr) const
 
const int8_t * getChunkiter (const InputColDescriptor col_desc, const int device_id=0) const
 
ChunkIter prepareChunkIter (AbstractBuffer *merged_data_buf, AbstractBuffer *merged_index_buf, ChunkIter &chunk_iter, bool is_true_varlen_type, const size_t total_num_tuples) const
 
const int8_t * getResultSetColumn (const ResultSetPtr &buffer, const int table_id, const int col_id, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
 

Static Private Member Functions

static const int8_t * transferColumnIfNeeded (const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
 

Private Attributes

Executorexecutor_
 
std::mutex columnar_fetch_mutex_
 
std::mutex varlen_chunk_fetch_mutex_
 
std::mutex linearization_mutex_
 
std::mutex chunk_list_mutex_
 
std::mutex linearized_col_cache_mutex_
 
ColumnCacheMap columnarized_table_cache_
 
std::unordered_map
< InputColDescriptor,
std::unique_ptr< const
ColumnarResults > > 
columnarized_scan_table_cache_
 
std::unordered_map
< InputColDescriptor,
DeviceMergedChunkIterMap
linearized_multi_frag_chunk_iter_cache_
 
std::unordered_map< int,
AbstractBuffer * > 
linearlized_temporary_cpu_index_buf_cache_
 
std::unordered_map
< InputColDescriptor,
DeviceMergedChunkMap
linearized_data_buf_cache_
 
std::unordered_map
< InputColDescriptor,
DeviceMergedChunkMap
linearized_idx_buf_cache_
 

Friends

class QueryCompilationDescriptor
 
class TableFunctionExecutionContext
 

Detailed Description

Definition at line 49 of file ColumnFetcher.h.

Member Typedef Documentation

using ColumnFetcher::DeviceMergedChunkIterMap = std::unordered_map<int, int8_t*>
private

Definition at line 189 of file ColumnFetcher.h.

using ColumnFetcher::DeviceMergedChunkMap = std::unordered_map<int, AbstractBuffer*>
private

Definition at line 190 of file ColumnFetcher.h.

Constructor & Destructor Documentation

ColumnFetcher::ColumnFetcher ( Executor executor,
const ColumnCacheMap column_cache 
)

Definition at line 61 of file ColumnFetcher.cpp.

62  : executor_(executor), columnarized_table_cache_(column_cache) {}
ColumnCacheMap columnarized_table_cache_
Executor * executor_

Member Function Documentation

void ColumnFetcher::addMergedChunkIter ( const InputColDescriptor  col_desc,
const int  device_id,
int8_t *  chunk_iter_ptr 
) const
private

Definition at line 1010 of file ColumnFetcher.cpp.

References InputColDescriptor::getColId(), InputColDescriptor::getScanDesc(), InputDescriptor::getTableId(), linearized_col_cache_mutex_, linearized_multi_frag_chunk_iter_cache_, and VLOG.

Referenced by linearizeColumnFragments().

1012  {
1013  std::lock_guard<std::mutex> linearize_guard(linearized_col_cache_mutex_);
1014  auto chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
1015  if (chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
1016  auto iter_device_it = chunk_iter_it->second.find(device_id);
1017  if (iter_device_it == chunk_iter_it->second.end()) {
1018  VLOG(2) << "Additional merged chunk_iter for col_desc (tbl: "
1019  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
1020  << "), device_id: " << device_id;
1021  chunk_iter_it->second.emplace(device_id, chunk_iter_ptr);
1022  }
1023  } else {
1024  DeviceMergedChunkIterMap iter_m;
1025  iter_m.emplace(device_id, chunk_iter_ptr);
1026  VLOG(2) << "New merged chunk_iter for col_desc (tbl: "
1027  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
1028  << "), device_id: " << device_id;
1029  linearized_multi_frag_chunk_iter_cache_.emplace(col_desc, iter_m);
1030  }
1031 }
std::unordered_map< int, int8_t * > DeviceMergedChunkIterMap
int getColId() const
int getTableId() const
std::mutex linearized_col_cache_mutex_
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
const InputDescriptor & getScanDesc() const
#define VLOG(n)
Definition: Logger.h:316

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnFetcher::freeLinearizedBuf ( )

Definition at line 1073 of file ColumnFetcher.cpp.

References cat(), executor_, linearized_col_cache_mutex_, linearized_data_buf_cache_, and linearized_idx_buf_cache_.

Referenced by Executor::executeWorkUnitImpl().

1073  {
1074  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
1075  const auto& cat = *executor_->getCatalog();
1076  auto& data_mgr = cat.getDataMgr();
1077 
1078  if (!linearized_data_buf_cache_.empty()) {
1079  for (auto& kv : linearized_data_buf_cache_) {
1080  for (auto& kv2 : kv.second) {
1081  data_mgr.free(kv2.second);
1082  }
1083  }
1084  }
1085 
1086  if (!linearized_idx_buf_cache_.empty()) {
1087  for (auto& kv : linearized_idx_buf_cache_) {
1088  for (auto& kv2 : kv.second) {
1089  data_mgr.free(kv2.second);
1090  }
1091  }
1092  }
1093 }
std::string cat(Ts &&...args)
Executor * executor_
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
std::mutex linearized_col_cache_mutex_
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_idx_buf_cache_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnFetcher::freeTemporaryCpuLinearizedIdxBuf ( )

Definition at line 1095 of file ColumnFetcher.cpp.

References cat(), executor_, linearized_col_cache_mutex_, and linearlized_temporary_cpu_index_buf_cache_.

Referenced by Executor::executeWorkUnitImpl().

1095  {
1096  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
1097  const auto& cat = *executor_->getCatalog();
1098  auto& data_mgr = cat.getDataMgr();
1101  data_mgr.free(kv.second);
1102  }
1103  }
1104 }
std::string cat(Ts &&...args)
Executor * executor_
std::unordered_map< int, AbstractBuffer * > linearlized_temporary_cpu_index_buf_cache_
std::mutex linearized_col_cache_mutex_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getAllTableColumnFragments ( const int  table_id,
const int  col_id,
const std::map< int, const TableFragments * > &  all_tables_fragments,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const

Definition at line 285 of file ColumnFetcher.cpp.

References CHECK, columnar_fetch_mutex_, columnarized_scan_table_cache_, Data_Namespace::CPU_LEVEL, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, getOneTableColumnFragment(), InputColDescriptor::getScanDesc(), InputDescriptor::getSourceType(), ColumnarResults::mergeResults(), TABLE, and transferColumnIfNeeded().

Referenced by Executor::fetchChunks(), and Executor::fetchUnionChunks().

292  {
293  const auto fragments_it = all_tables_fragments.find(table_id);
294  CHECK(fragments_it != all_tables_fragments.end());
295  const auto fragments = fragments_it->second;
296  const auto frag_count = fragments->size();
297  std::vector<std::unique_ptr<ColumnarResults>> column_frags;
298  const ColumnarResults* table_column = nullptr;
299  const InputColDescriptor col_desc(col_id, table_id, int(0));
300  CHECK(col_desc.getScanDesc().getSourceType() == InputSourceType::TABLE);
301  {
302  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
303  auto column_it = columnarized_scan_table_cache_.find(col_desc);
304  if (column_it == columnarized_scan_table_cache_.end()) {
305  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
307  executor_->checkNonKernelTimeInterrupted()) {
309  }
310  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
311  std::list<ChunkIter> chunk_iter_holder;
312  const auto& fragment = (*fragments)[frag_id];
313  if (fragment.isEmptyPhysicalFragment()) {
314  continue;
315  }
316  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
317  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
318  auto col_buffer = getOneTableColumnFragment(table_id,
319  static_cast<int>(frag_id),
320  col_id,
321  all_tables_fragments,
322  chunk_holder,
323  chunk_iter_holder,
325  int(0),
326  device_allocator);
327  column_frags.push_back(
328  std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
329  col_buffer,
330  fragment.getNumTuples(),
331  chunk_meta_it->second->sqlType,
332  executor_->executor_id_,
333  thread_idx));
334  }
335  auto merged_results =
336  ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
337  table_column = merged_results.get();
338  columnarized_scan_table_cache_.emplace(col_desc, std::move(merged_results));
339  } else {
340  table_column = column_it->second.get();
341  }
342  }
343  return ColumnFetcher::transferColumnIfNeeded(table_column,
344  0,
345  executor_->getDataMgr(),
346  memory_level,
347  device_id,
348  device_allocator);
349 }
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1439
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::mutex columnar_fetch_mutex_
Executor * executor_
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getChunkiter ( const InputColDescriptor  col_desc,
const int  device_id = 0 
) const
private

Definition at line 1033 of file ColumnFetcher.cpp.

References InputColDescriptor::getColId(), InputColDescriptor::getScanDesc(), InputDescriptor::getTableId(), linearized_multi_frag_chunk_iter_cache_, and VLOG.

Referenced by linearizeColumnFragments().

1034  {
1035  auto linearized_chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
1036  if (linearized_chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
1037  auto dev_iter_map_it = linearized_chunk_iter_it->second.find(device_id);
1038  if (dev_iter_map_it != linearized_chunk_iter_it->second.end()) {
1039  VLOG(2) << "Recycle merged chunk_iter for col_desc (tbl: "
1040  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
1041  << "), device_id: " << device_id;
1042  return dev_iter_map_it->second;
1043  }
1044  }
1045  return nullptr;
1046 }
int getColId() const
int getTableId() const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
const InputDescriptor & getScanDesc() const
#define VLOG(n)
Definition: Logger.h:316

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< const int8_t *, size_t > ColumnFetcher::getOneColumnFragment ( Executor executor,
const Analyzer::ColumnVar hash_col,
const Fragmenter_Namespace::FragmentInfo fragment,
const Data_Namespace::MemoryLevel  effective_mem_lvl,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner,
ColumnCacheMap column_cache 
)
static

Gets one chunk's pointer and element count on either CPU or GPU.

Gets a column fragment chunk on CPU or on GPU depending on the effective memory level parameter. For temporary tables, the chunk will be copied to the GPU if needed. Returns a buffer pointer and an element count.

Definition at line 67 of file ColumnFetcher.cpp.

References CHECK, anonymous_namespace{ColumnFetcher.cpp}::columnarize_result(), Data_Namespace::CPU_LEVEL, DEBUG_TIMER, Fragmenter_Namespace::FragmentInfo::fragmentId, get_column_descriptor_maybe(), Analyzer::ColumnVar::get_column_id(), Analyzer::ColumnVar::get_table_id(), get_temporary_table(), Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMap(), Fragmenter_Namespace::FragmentInfo::getNumTuples(), Fragmenter_Namespace::FragmentInfo::isEmptyPhysicalFragment(), Fragmenter_Namespace::FragmentInfo::physicalTableId, and transferColumnIfNeeded().

Referenced by RelAlgExecutor::createWindowFunctionContext(), TableFunctionExecutionContext::execute(), and makeJoinColumn().

76  {
77  static std::mutex columnar_conversion_mutex;
78  auto timer = DEBUG_TIMER(__func__);
79  if (fragment.isEmptyPhysicalFragment()) {
80  return {nullptr, 0};
81  }
82  const auto table_id = hash_col.get_table_id();
83  const auto& catalog = *executor->getCatalog();
84  const auto cd =
85  get_column_descriptor_maybe(hash_col.get_column_id(), table_id, catalog);
86  CHECK(!cd || !(cd->isVirtualCol));
87  const int8_t* col_buff = nullptr;
88  if (cd) { // real table
89  /* chunk_meta_it is used here to retrieve chunk numBytes and
90  numElements. Apparently, their values are often zeros. If we
91  knew how to predict the zero values, calling
92  getChunkMetadataMap could be avoided to skip
93  synthesize_metadata calls. */
94  auto chunk_meta_it = fragment.getChunkMetadataMap().find(hash_col.get_column_id());
95  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
96  ChunkKey chunk_key{catalog.getCurrentDB().dbId,
97  fragment.physicalTableId,
98  hash_col.get_column_id(),
99  fragment.fragmentId};
100  const auto chunk = Chunk_NS::Chunk::getChunk(
101  cd,
102  &catalog.getDataMgr(),
103  chunk_key,
104  effective_mem_lvl,
105  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
106  chunk_meta_it->second->numBytes,
107  chunk_meta_it->second->numElements);
108  chunks_owner.push_back(chunk);
109  CHECK(chunk);
110  auto ab = chunk->getBuffer();
111  CHECK(ab->getMemoryPtr());
112  col_buff = reinterpret_cast<int8_t*>(ab->getMemoryPtr());
113  } else { // temporary table
114  const ColumnarResults* col_frag{nullptr};
115  {
116  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
117  const auto frag_id = fragment.fragmentId;
118  if (column_cache.empty() || !column_cache.count(table_id)) {
119  column_cache.insert(std::make_pair(
120  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
121  }
122  auto& frag_id_to_result = column_cache[table_id];
123  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
124  frag_id_to_result.insert(
125  std::make_pair(frag_id,
126  std::shared_ptr<const ColumnarResults>(columnarize_result(
127  executor->row_set_mem_owner_,
128  get_temporary_table(executor->temporary_tables_, table_id),
129  executor->executor_id_,
130  thread_idx,
131  frag_id))));
132  }
133  col_frag = column_cache[table_id][frag_id].get();
134  }
135  col_buff = transferColumnIfNeeded(
136  col_frag,
137  hash_col.get_column_id(),
138  &catalog.getDataMgr(),
139  effective_mem_lvl,
140  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
141  device_allocator);
142  }
143  return {col_buff, fragment.getNumTuples()};
144 }
int get_table_id() const
Definition: Analyzer.h:201
std::vector< int > ChunkKey
Definition: types.h:36
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const size_t executor_id, const int frag_id)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:228
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:220
const ChunkMetadataMap & getChunkMetadataMap() const
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:374
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
int get_column_id() const
Definition: Analyzer.h:202

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getOneTableColumnFragment ( const int  table_id,
const int  frag_id,
const int  col_id,
const std::map< int, const TableFragments * > &  all_tables_fragments,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator 
) const

Definition at line 210 of file ColumnFetcher.cpp.

References Allocator::alloc(), cat(), CHECK, CHECK_EQ, CHECK_GT, chunk_list_mutex_, DeviceAllocator::copyToDevice(), Data_Namespace::CPU_LEVEL, executor_, get_column_descriptor(), get_column_type(), Chunk_NS::Chunk::getChunk(), Data_Namespace::GPU_LEVEL, kENCODING_NONE, and varlen_chunk_fetch_mutex_.

Referenced by Executor::fetchChunks(), Executor::fetchUnionChunks(), and getAllTableColumnFragments().

219  {
220  const auto fragments_it = all_tables_fragments.find(table_id);
221  CHECK(fragments_it != all_tables_fragments.end());
222  const auto fragments = fragments_it->second;
223  const auto& fragment = (*fragments)[frag_id];
224  if (fragment.isEmptyPhysicalFragment()) {
225  return nullptr;
226  }
227  std::shared_ptr<Chunk_NS::Chunk> chunk;
228  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
229  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
230  CHECK(table_id > 0);
231  const auto& cat = *executor_->getCatalog();
232  auto cd = get_column_descriptor(col_id, table_id, cat);
233  CHECK(cd);
234  const auto col_type =
235  get_column_type(col_id, table_id, cd, executor_->temporary_tables_);
236  const bool is_real_string =
237  col_type.is_string() && col_type.get_compression() == kENCODING_NONE;
238  const bool is_varlen =
239  is_real_string ||
240  col_type.is_array(); // TODO: should it be col_type.is_varlen_array() ?
241  {
242  ChunkKey chunk_key{
243  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
244  std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
245  if (is_varlen) {
246  varlen_chunk_lock.reset(new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
247  }
249  cd,
250  &cat.getDataMgr(),
251  chunk_key,
252  memory_level,
253  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id,
254  chunk_meta_it->second->numBytes,
255  chunk_meta_it->second->numElements);
256  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
257  chunk_holder.push_back(chunk);
258  }
259  if (is_varlen) {
260  CHECK_GT(table_id, 0);
261  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
262  chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
263  auto& chunk_iter = chunk_iter_holder.back();
264  if (memory_level == Data_Namespace::CPU_LEVEL) {
265  return reinterpret_cast<int8_t*>(&chunk_iter);
266  } else {
267  auto ab = chunk->getBuffer();
268  ab->pin();
269  auto& row_set_mem_owner = executor_->getRowSetMemoryOwner();
270  row_set_mem_owner->addVarlenInputBuffer(ab);
271  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
272  CHECK(allocator);
273  auto chunk_iter_gpu = allocator->alloc(sizeof(ChunkIter));
274  allocator->copyToDevice(
275  chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter), sizeof(ChunkIter));
276  return chunk_iter_gpu;
277  }
278  } else {
279  auto ab = chunk->getBuffer();
280  CHECK(ab->getMemoryPtr());
281  return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
282  }
283 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< int > ChunkKey
Definition: types.h:36
std::string cat(Ts &&...args)
std::mutex varlen_chunk_fetch_mutex_
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:236
Executor * executor_
#define CHECK_GT(x, y)
Definition: Logger.h:234
std::mutex chunk_list_mutex_
#define CHECK(condition)
Definition: Logger.h:222
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:191

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getResultSetColumn ( const InputColDescriptor col_desc,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const

Definition at line 351 of file ColumnFetcher.cpp.

References CHECK, executor_, get_temporary_table(), InputColDescriptor::getColId(), InputColDescriptor::getScanDesc(), and InputDescriptor::getTableId().

Referenced by Executor::fetchChunks(), and Executor::fetchUnionChunks().

356  {
357  CHECK(col_desc);
358  const auto table_id = col_desc->getScanDesc().getTableId();
359  return getResultSetColumn(get_temporary_table(executor_->temporary_tables_, table_id),
360  table_id,
361  col_desc->getColId(),
362  memory_level,
363  device_id,
364  device_allocator,
365  thread_idx);
366 }
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
Executor * executor_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:228
int getColId() const
int getTableId() const
#define CHECK(condition)
Definition: Logger.h:222
const InputDescriptor & getScanDesc() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::getResultSetColumn ( const ResultSetPtr buffer,
const int  table_id,
const int  col_id,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const
private

Definition at line 1106 of file ColumnFetcher.cpp.

References CHECK_GE, CHECK_NE, columnar_fetch_mutex_, anonymous_namespace{ColumnFetcher.cpp}::columnarize_result(), columnarized_table_cache_, executor_, run_benchmark_import::result, and transferColumnIfNeeded().

1113  {
1114  const ColumnarResults* result{nullptr};
1115  {
1116  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
1117  if (columnarized_table_cache_.empty() || !columnarized_table_cache_.count(table_id)) {
1118  columnarized_table_cache_.insert(std::make_pair(
1119  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
1120  }
1121  auto& frag_id_to_result = columnarized_table_cache_[table_id];
1122  int frag_id = 0;
1123  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
1124  frag_id_to_result.insert(
1125  std::make_pair(frag_id,
1126  std::shared_ptr<const ColumnarResults>(
1127  columnarize_result(executor_->row_set_mem_owner_,
1128  buffer,
1129  executor_->executor_id_,
1130  thread_idx,
1131  frag_id))));
1132  }
1133  CHECK_NE(size_t(0), columnarized_table_cache_.count(table_id));
1134  result = columnarized_table_cache_[table_id][frag_id].get();
1135  }
1136  CHECK_GE(col_id, 0);
1137  return transferColumnIfNeeded(
1138  result, col_id, executor_->getDataMgr(), memory_level, device_id, device_allocator);
1139 }
std::mutex columnar_fetch_mutex_
#define CHECK_GE(x, y)
Definition: Logger.h:235
ColumnCacheMap columnarized_table_cache_
Executor * executor_
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const size_t executor_id, const int frag_id)
#define CHECK_NE(x, y)
Definition: Logger.h:231
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)

+ Here is the call graph for this function:

const int8_t * ColumnFetcher::linearizeColumnFragments ( const int  table_id,
const int  col_id,
const std::map< int, const TableFragments * > &  all_tables_fragments,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const

Definition at line 368 of file ColumnFetcher.cpp.

References addMergedChunkIter(), Allocator::alloc(), cat(), CHECK, CHECK_EQ, CHECK_GT, chunk_list_mutex_, DeviceAllocator::copyToDevice(), Data_Namespace::CPU_LEVEL, DEBUG_TIMER, executor_, get_column_descriptor(), Chunk_NS::Chunk::getChunk(), getChunkiter(), anonymous_namespace{ColumnFetcher.cpp}::getMemoryLevelString(), InputColDescriptor::getScanDesc(), InputDescriptor::getSourceType(), Data_Namespace::GPU_LEVEL, linearization_mutex_, linearized_col_cache_mutex_, linearized_multi_frag_chunk_iter_cache_, linearizeFixedLenArrayColFrags(), linearizeVarLenArrayColFrags(), prepareChunkIter(), run_benchmark_import::res, TABLE, varlen_chunk_fetch_mutex_, and VLOG.

Referenced by Executor::fetchChunks().

377  {
378  auto timer = DEBUG_TIMER(__func__);
379  const auto fragments_it = all_tables_fragments.find(table_id);
380  CHECK(fragments_it != all_tables_fragments.end());
381  const auto fragments = fragments_it->second;
382  const auto frag_count = fragments->size();
383  const InputColDescriptor col_desc(col_id, table_id, int(0));
384  const auto& cat = *executor_->getCatalog();
385  auto cd = get_column_descriptor(col_id, table_id, cat);
386  CHECK(cd);
387  CHECK(col_desc.getScanDesc().getSourceType() == InputSourceType::TABLE);
388  CHECK_GT(table_id, 0);
389  bool is_varlen_chunk = cd->columnType.is_varlen() && !cd->columnType.is_fixlen_array();
390  size_t total_num_tuples = 0;
391  size_t total_data_buf_size = 0;
392  size_t total_idx_buf_size = 0;
393  {
394  std::lock_guard<std::mutex> linearize_guard(linearized_col_cache_mutex_);
395  auto linearized_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
396  if (linearized_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
397  if (memory_level == CPU_LEVEL) {
398  // in CPU execution, each kernel can share merged chunk since they operates in the
399  // same memory space, so we can share the same chunk iter among kernels
400  return getChunkiter(col_desc, 0);
401  } else {
402  // in GPU execution, this becomes the matter when we deploy multi-GPUs
403  // so we only share the chunk_iter iff kernels are launched on the same GPU device
404  // otherwise we need to separately load merged chunk and its iter
405  // todo(yoonmin): D2D copy of merged chunk and its iter?
406  if (linearized_iter_it->second.find(device_id) !=
407  linearized_iter_it->second.end()) {
408  // note that cached chunk_iter is located on CPU memory space...
409  // we just need to copy it to each device
410  // chunk_iter already contains correct buffer addr depending on execution device
411  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
412  device_allocator->copyToDevice(
413  chunk_iter_gpu, getChunkiter(col_desc, device_id), sizeof(ChunkIter));
414  return chunk_iter_gpu;
415  }
416  }
417  }
418  }
419 
420  // collect target fragments
421  // basically we load chunk in CPU first, and do necessary manipulation
422  // to make semantics of a merged chunk correctly
423  std::shared_ptr<Chunk_NS::Chunk> chunk;
424  std::list<std::shared_ptr<Chunk_NS::Chunk>> local_chunk_holder;
425  std::list<ChunkIter> local_chunk_iter_holder;
426  std::list<size_t> local_chunk_num_tuples;
427  {
428  std::lock_guard<std::mutex> linearize_guard(varlen_chunk_fetch_mutex_);
429  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
430  const auto& fragment = (*fragments)[frag_id];
431  if (fragment.isEmptyPhysicalFragment()) {
432  continue;
433  }
434  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
435  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
436  ChunkKey chunk_key{
437  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
438  chunk = Chunk_NS::Chunk::getChunk(cd,
439  &cat.getDataMgr(),
440  chunk_key,
442  0,
443  chunk_meta_it->second->numBytes,
444  chunk_meta_it->second->numElements);
445  local_chunk_holder.push_back(chunk);
446  auto chunk_iter = chunk->begin_iterator(chunk_meta_it->second);
447  local_chunk_iter_holder.push_back(chunk_iter);
448  local_chunk_num_tuples.push_back(fragment.getNumTuples());
449  total_num_tuples += fragment.getNumTuples();
450  total_data_buf_size += chunk->getBuffer()->size();
451  std::ostringstream oss;
452  oss << "Load chunk for col_name: " << chunk->getColumnDesc()->columnName
453  << ", col_id: " << chunk->getColumnDesc()->columnId << ", Frag-" << frag_id
454  << ", numTuples: " << fragment.getNumTuples()
455  << ", data_size: " << chunk->getBuffer()->size();
456  if (chunk->getIndexBuf()) {
457  auto idx_buf_size = chunk->getIndexBuf()->size() - sizeof(ArrayOffsetT);
458  oss << ", index_size: " << idx_buf_size;
459  total_idx_buf_size += idx_buf_size;
460  }
461  VLOG(2) << oss.str();
462  }
463  }
464 
465  auto& col_ti = cd->columnType;
466  MergedChunk res{nullptr, nullptr};
467  // Do linearize multi-fragmented column depending on column type
468  // We cover array and non-encoded text columns
469  // Note that geo column is actually organized as a set of arrays
470  // and each geo object has different set of vectors that they require
471  // Here, we linearize each array at a time, so eventually the geo object has a set of
472  // "linearized" arrays
473  {
474  std::lock_guard<std::mutex> linearization_guard(linearization_mutex_);
475  if (col_ti.is_array()) {
476  if (col_ti.is_fixlen_array()) {
477  VLOG(2) << "Linearize fixed-length multi-frag array column (col_id: "
478  << cd->columnId << ", col_name: " << cd->columnName
479  << ", device_type: " << getMemoryLevelString(memory_level)
480  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
482  chunk_holder,
483  chunk_iter_holder,
484  local_chunk_holder,
485  local_chunk_iter_holder,
486  local_chunk_num_tuples,
487  memory_level,
488  cd,
489  device_id,
490  total_data_buf_size,
491  total_idx_buf_size,
492  total_num_tuples,
493  device_allocator,
494  thread_idx);
495  } else {
496  CHECK(col_ti.is_varlen_array());
497  VLOG(2) << "Linearize variable-length multi-frag array column (col_id: "
498  << cd->columnId << ", col_name: " << cd->columnName
499  << ", device_type: " << getMemoryLevelString(memory_level)
500  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
502  chunk_holder,
503  chunk_iter_holder,
504  local_chunk_holder,
505  local_chunk_iter_holder,
506  local_chunk_num_tuples,
507  memory_level,
508  cd,
509  device_id,
510  total_data_buf_size,
511  total_idx_buf_size,
512  total_num_tuples,
513  device_allocator,
514  thread_idx);
515  }
516  }
517  if (col_ti.is_string() && !col_ti.is_dict_encoded_string()) {
518  VLOG(2) << "Linearize variable-length multi-frag non-encoded text column (col_id: "
519  << cd->columnId << ", col_name: " << cd->columnName
520  << ", device_type: " << getMemoryLevelString(memory_level)
521  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
523  chunk_holder,
524  chunk_iter_holder,
525  local_chunk_holder,
526  local_chunk_iter_holder,
527  local_chunk_num_tuples,
528  memory_level,
529  cd,
530  device_id,
531  total_data_buf_size,
532  total_idx_buf_size,
533  total_num_tuples,
534  device_allocator,
535  thread_idx);
536  }
537  }
538  CHECK(res.first); // check merged data buffer
539  if (!col_ti.is_fixlen_array()) {
540  CHECK(res.second); // check merged index buffer
541  }
542  auto merged_data_buffer = res.first;
543  auto merged_index_buffer = res.second;
544 
545  // prepare ChunkIter for the linearized chunk
546  auto merged_chunk = std::make_shared<Chunk_NS::Chunk>(
547  merged_data_buffer, merged_index_buffer, cd, false);
548  // to prepare chunk_iter for the merged chunk, we pass one of local chunk iter
549  // to fill necessary metadata that is a common for all merged chunks
550  auto merged_chunk_iter = prepareChunkIter(merged_data_buffer,
551  merged_index_buffer,
552  *(local_chunk_iter_holder.rbegin()),
553  is_varlen_chunk,
554  total_num_tuples);
555  {
556  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
557  chunk_holder.push_back(merged_chunk);
558  chunk_iter_holder.push_back(merged_chunk_iter);
559  }
560 
561  auto merged_chunk_iter_ptr = reinterpret_cast<int8_t*>(&(chunk_iter_holder.back()));
562  if (memory_level == MemoryLevel::CPU_LEVEL) {
563  addMergedChunkIter(col_desc, 0, merged_chunk_iter_ptr);
564  return merged_chunk_iter_ptr;
565  } else {
566  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
567  CHECK(device_allocator);
568  addMergedChunkIter(col_desc, device_id, merged_chunk_iter_ptr);
569  // note that merged_chunk_iter_ptr resides in CPU memory space
570  // having its content aware GPU buffer that we alloc. for merging
571  // so we need to copy this chunk_iter to each device explicitly
572  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
573  device_allocator->copyToDevice(
574  chunk_iter_gpu, merged_chunk_iter_ptr, sizeof(ChunkIter));
575  return chunk_iter_gpu;
576  }
577 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< int > ChunkKey
Definition: types.h:36
std::string cat(Ts &&...args)
std::mutex varlen_chunk_fetch_mutex_
virtual int8_t * alloc(const size_t num_bytes)=0
const int8_t * getChunkiter(const InputColDescriptor col_desc, const int device_id=0) const
Executor * executor_
std::mutex linearization_mutex_
#define CHECK_GT(x, y)
Definition: Logger.h:234
void addMergedChunkIter(const InputColDescriptor col_desc, const int device_id, int8_t *chunk_iter_ptr) const
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
MergedChunk linearizeVarLenArrayColFrags(const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
MergedChunk linearizeFixedLenArrayColFrags(const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::mutex linearized_col_cache_mutex_
std::mutex chunk_list_mutex_
int32_t ArrayOffsetT
Definition: sqltypes.h:1229
std::pair< AbstractBuffer *, AbstractBuffer * > MergedChunk
Definition: ColumnFetcher.h:47
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:374
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
ChunkIter prepareChunkIter(AbstractBuffer *merged_data_buf, AbstractBuffer *merged_index_buf, ChunkIter &chunk_iter, bool is_true_varlen_type, const size_t total_num_tuples) const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
#define VLOG(n)
Definition: Logger.h:316
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:191

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

MergedChunk ColumnFetcher::linearizeFixedLenArrayColFrags ( const Catalog_Namespace::Catalog cat,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  local_chunk_holder,
std::list< ChunkIter > &  local_chunk_iter_holder,
std::list< size_t > &  local_chunk_num_tuples,
MemoryLevel  memory_level,
const ColumnDescriptor cd,
const int  device_id,
const size_t  total_data_buf_size,
const size_t  total_idx_buf_size,
const size_t  total_num_tuples,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const
private

Definition at line 902 of file ColumnFetcher.cpp.

References Data_Namespace::DataMgr::alloc(), Data_Namespace::AbstractBuffer::append(), CHECK_EQ, check_interrupt(), ColumnDescriptor::columnId, Data_Namespace::CPU_LEVEL, Executor::ERR_INTERRUPTED, g_enable_non_kernel_time_query_interrupt, Catalog_Namespace::Catalog::getDataMgr(), anonymous_namespace{ColumnFetcher.cpp}::getMemoryLevelString(), linearized_col_cache_mutex_, linearized_data_buf_cache_, ColumnDescriptor::tableId, timer_start(), timer_stop(), and VLOG.

Referenced by linearizeColumnFragments().

916  {
917  int64_t linearization_time_ms = 0;
918  auto clock_begin = timer_start();
919  // linearize collected fragments
920  AbstractBuffer* merged_data_buffer = nullptr;
921  bool has_cached_merged_data_buf = false;
922  const InputColDescriptor icd(cd->columnId, cd->tableId, int(0));
923  {
924  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
925  auto cached_data_buf_cache_it = linearized_data_buf_cache_.find(icd);
926  if (cached_data_buf_cache_it != linearized_data_buf_cache_.end()) {
927  auto& cd_cache = cached_data_buf_cache_it->second;
928  auto cached_data_buf_it = cd_cache.find(device_id);
929  if (cached_data_buf_it != cd_cache.end()) {
930  has_cached_merged_data_buf = true;
931  merged_data_buffer = cached_data_buf_it->second;
932  VLOG(2) << "Recycle merged data buffer for linearized chunks (memory_level: "
933  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
934  << ")";
935  } else {
936  merged_data_buffer =
937  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
938  VLOG(2) << "Allocate " << total_data_buf_size
939  << " bytes of data buffer space for linearized chunks (memory_level: "
940  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
941  << ")";
942  cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
943  }
944  } else {
946  merged_data_buffer =
947  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
948  VLOG(2) << "Allocate " << total_data_buf_size
949  << " bytes of data buffer space for linearized chunks (memory_level: "
950  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
951  << ")";
952  m.insert(std::make_pair(device_id, merged_data_buffer));
953  linearized_data_buf_cache_.insert(std::make_pair(icd, m));
954  }
955  }
956  if (!has_cached_merged_data_buf) {
957  size_t sum_data_buf_size = 0;
958  auto chunk_holder_it = local_chunk_holder.begin();
959  auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
960  for (; chunk_holder_it != local_chunk_holder.end();
961  chunk_holder_it++, chunk_iter_holder_it++) {
964  }
965  auto target_chunk = chunk_holder_it->get();
966  auto target_chunk_data_buffer = target_chunk->getBuffer();
967  merged_data_buffer->append(target_chunk_data_buffer->getMemoryPtr(),
968  target_chunk_data_buffer->size(),
970  device_id);
971  sum_data_buf_size += target_chunk_data_buffer->size();
972  }
973  // check whether each chunk's data buffer is clean under chunk merging
974  CHECK_EQ(total_data_buf_size, sum_data_buf_size);
975  }
976  linearization_time_ms += timer_stop(clock_begin);
977  VLOG(2) << "Linearization has been successfully done, elapsed time: "
978  << linearization_time_ms << " ms.";
979  return {merged_data_buffer, nullptr};
980 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::unordered_map< int, AbstractBuffer * > DeviceMergedChunkMap
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1439
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:243
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
__device__ bool check_interrupt()
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
std::mutex linearized_col_cache_mutex_
An AbstractBuffer is a unit of data management for a data manager.
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
#define VLOG(n)
Definition: Logger.h:316
Type timer_start()
Definition: measure.h:42
AbstractBuffer * alloc(const MemoryLevel memoryLevel, const int deviceId, const size_t numBytes)
Definition: DataMgr.cpp:516

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

MergedChunk ColumnFetcher::linearizeVarLenArrayColFrags ( const Catalog_Namespace::Catalog cat,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  chunk_holder,
std::list< ChunkIter > &  chunk_iter_holder,
std::list< std::shared_ptr< Chunk_NS::Chunk >> &  local_chunk_holder,
std::list< ChunkIter > &  local_chunk_iter_holder,
std::list< size_t > &  local_chunk_num_tuples,
MemoryLevel  memory_level,
const ColumnDescriptor cd,
const int  device_id,
const size_t  total_data_buf_size,
const size_t  total_idx_buf_size,
const size_t  total_num_tuples,
DeviceAllocator device_allocator,
const size_t  thread_idx 
) const
private

Definition at line 579 of file ColumnFetcher.cpp.

References Data_Namespace::DataMgr::alloc(), Data_Namespace::AbstractBuffer::append(), threading_serial::async(), CHECK, ColumnDescriptor::columnId, gpu_enabled::copy(), DeviceAllocator::copyToDevice(), Data_Namespace::CPU_LEVEL, cpu_threads(), ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE, run_benchmark_import::dest, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, g_enable_parallel_linearization, Catalog_Namespace::Catalog::getDataMgr(), anonymous_namespace{ColumnFetcher.cpp}::getMemoryLevelString(), Data_Namespace::AbstractBuffer::getMemoryPtr(), Data_Namespace::GPU_LEVEL, LIKELY, linearized_col_cache_mutex_, linearized_data_buf_cache_, linearized_idx_buf_cache_, linearlized_temporary_cpu_index_buf_cache_, makeIntervals(), gpu_enabled::sort(), ColumnDescriptor::tableId, timer_start(), timer_stop(), and VLOG.

Referenced by linearizeColumnFragments().

593  {
594  // for linearization of varlen col we have to deal with not only data buffer
595  // but also its underlying index buffer which is responsible for offset of varlen value
596  // basically we maintain per-device linearized (data/index) buffer
597  // for data buffer, we linearize varlen col's chunks within a device-specific buffer
598  // by just appending each chunk
599  // for index buffer, we need to not only appending each chunk but modify the offset
600  // value to affect various conditions like nullness, padding and so on so we first
601  // append index buffer in CPU, manipulate it as we required and then copy it to specific
602  // device if necessary (for GPU execution)
603  AbstractBuffer* merged_index_buffer_in_cpu = nullptr;
604  AbstractBuffer* merged_data_buffer = nullptr;
605  bool has_cached_merged_idx_buf = false;
606  bool has_cached_merged_data_buf = false;
607  const InputColDescriptor icd(cd->columnId, cd->tableId, int(0));
608  // check linearized buffer's cache first
609  // if not exists, alloc necessary buffer space to prepare linearization
610  int64_t linearization_time_ms = 0;
611  auto clock_begin = timer_start();
612  {
613  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
614  auto cached_data_buf_cache_it = linearized_data_buf_cache_.find(icd);
615  if (cached_data_buf_cache_it != linearized_data_buf_cache_.end()) {
616  auto& cd_cache = cached_data_buf_cache_it->second;
617  auto cached_data_buf_it = cd_cache.find(device_id);
618  if (cached_data_buf_it != cd_cache.end()) {
619  has_cached_merged_data_buf = true;
620  merged_data_buffer = cached_data_buf_it->second;
621  VLOG(2) << "Recycle merged data buffer for linearized chunks (memory_level: "
622  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
623  << ")";
624  } else {
625  merged_data_buffer =
626  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
627  VLOG(2) << "Allocate " << total_data_buf_size
628  << " bytes of data buffer space for linearized chunks (memory_level: "
629  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
630  << ")";
631  cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
632  }
633  } else {
635  merged_data_buffer =
636  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
637  VLOG(2) << "Allocate " << total_data_buf_size
638  << " bytes of data buffer space for linearized chunks (memory_level: "
639  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
640  << ")";
641  m.insert(std::make_pair(device_id, merged_data_buffer));
642  linearized_data_buf_cache_.insert(std::make_pair(icd, m));
643  }
644 
645  auto cached_index_buf_it =
647  if (cached_index_buf_it != linearlized_temporary_cpu_index_buf_cache_.end()) {
648  has_cached_merged_idx_buf = true;
649  merged_index_buffer_in_cpu = cached_index_buf_it->second;
650  VLOG(2)
651  << "Recycle merged temporary idx buffer for linearized chunks (memory_level: "
652  << getMemoryLevelString(memory_level) << ", device_id: " << device_id << ")";
653  } else {
654  auto idx_buf_size = total_idx_buf_size + sizeof(ArrayOffsetT);
655  merged_index_buffer_in_cpu =
656  cat.getDataMgr().alloc(Data_Namespace::CPU_LEVEL, 0, idx_buf_size);
657  VLOG(2) << "Allocate " << idx_buf_size
658  << " bytes of temporary idx buffer space on CPU for linearized chunks";
659  // just copy the buf addr since we access it via the pointer itself
661  std::make_pair(cd->columnId, merged_index_buffer_in_cpu));
662  }
663  }
664 
665  // linearize buffers if we don't have corresponding buf in cache
666  size_t sum_data_buf_size = 0;
667  size_t cur_sum_num_tuples = 0;
668  size_t total_idx_size_modifier = 0;
669  auto chunk_holder_it = local_chunk_holder.begin();
670  auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
671  auto chunk_num_tuple_it = local_chunk_num_tuples.begin();
672  bool null_padded_first_elem = false;
673  bool null_padded_last_val = false;
674  // before entering the actual linearization part, we first need to check
675  // the overflow case where the sum of index offset becomes larger than 2GB
676  // which currently incurs incorrect query result due to negative array offset
677  // note that we can separate this from the main linearization logic b/c
678  // we just need to see few last elems
679  // todo (yoonmin) : relax this to support larger chunk size (>2GB)
680  for (; chunk_holder_it != local_chunk_holder.end();
681  chunk_holder_it++, chunk_num_tuple_it++) {
682  // check the offset overflow based on the last "valid" offset for each chunk
683  auto target_chunk = chunk_holder_it->get();
684  auto target_chunk_data_buffer = target_chunk->getBuffer();
685  auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
686  auto target_idx_buf_ptr =
687  reinterpret_cast<ArrayOffsetT*>(target_chunk_idx_buffer->getMemoryPtr());
688  auto cur_chunk_num_tuples = *chunk_num_tuple_it;
689  ArrayOffsetT original_offset = -1;
690  size_t cur_idx = cur_chunk_num_tuples;
691  // find the valid (e.g., non-null) offset starting from the last elem
692  while (original_offset < 0) {
693  original_offset = target_idx_buf_ptr[--cur_idx];
694  }
695  ArrayOffsetT new_offset = original_offset + sum_data_buf_size;
696  if (new_offset < 0) {
697  throw std::runtime_error(
698  "Linearization of a variable-length column having chunk size larger than 2GB "
699  "not supported yet");
700  }
701  sum_data_buf_size += target_chunk_data_buffer->size();
702  }
703  chunk_holder_it = local_chunk_holder.begin();
704  chunk_num_tuple_it = local_chunk_num_tuples.begin();
705  sum_data_buf_size = 0;
706 
707  for (; chunk_holder_it != local_chunk_holder.end();
708  chunk_holder_it++, chunk_iter_holder_it++, chunk_num_tuple_it++) {
710  executor_->checkNonKernelTimeInterrupted()) {
712  }
713  auto target_chunk = chunk_holder_it->get();
714  auto target_chunk_data_buffer = target_chunk->getBuffer();
715  auto cur_chunk_num_tuples = *chunk_num_tuple_it;
716  auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
717  auto target_idx_buf_ptr =
718  reinterpret_cast<ArrayOffsetT*>(target_chunk_idx_buffer->getMemoryPtr());
719  auto idx_buf_size = target_chunk_idx_buffer->size() - sizeof(ArrayOffsetT);
720  auto target_data_buffer_start_ptr = target_chunk_data_buffer->getMemoryPtr();
721  auto target_data_buffer_size = target_chunk_data_buffer->size();
722 
723  // when linearizing idx buffers, we need to consider the following cases
724  // 1. the first idx val is padded (a. null / b. empty varlen arr / c. 1-byte size
725  // varlen arr, i.e., {1})
726  // 2. the last idx val is null
727  // 3. null value(s) is/are located in a middle of idx buf <-- we don't need to care
728  if (cur_sum_num_tuples > 0 && target_idx_buf_ptr[0] > 0) {
729  null_padded_first_elem = true;
730  target_data_buffer_start_ptr += ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
731  target_data_buffer_size -= ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
732  total_idx_size_modifier += ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
733  }
734  // we linearize data_buf in device-specific buffer
735  if (!has_cached_merged_data_buf) {
736  merged_data_buffer->append(target_data_buffer_start_ptr,
737  target_data_buffer_size,
739  device_id);
740  }
741 
742  if (!has_cached_merged_idx_buf) {
743  // linearize idx buf in CPU first
744  merged_index_buffer_in_cpu->append(target_chunk_idx_buffer->getMemoryPtr(),
745  idx_buf_size,
747  0); // merged_index_buffer_in_cpu resides in CPU
748  auto idx_buf_ptr =
749  reinterpret_cast<ArrayOffsetT*>(merged_index_buffer_in_cpu->getMemoryPtr());
750  // here, we do not need to manipulate the very first idx buf, just let it as is
751  // and modify otherwise (i.e., starting from second chunk idx buf)
752  if (cur_sum_num_tuples > 0) {
753  if (null_padded_last_val) {
754  // case 2. the previous chunk's last index val is null so we need to set this
755  // chunk's first val to be null
756  idx_buf_ptr[cur_sum_num_tuples] = -sum_data_buf_size;
757  }
758  const size_t worker_count = cpu_threads();
759  std::vector<std::future<void>> conversion_threads;
760  std::vector<std::vector<size_t>> null_padded_row_idx_vecs(worker_count,
761  std::vector<size_t>());
762  bool is_parallel_modification = false;
763  std::vector<size_t> null_padded_row_idx_vec;
764  const auto do_work = [&cur_sum_num_tuples,
765  &sum_data_buf_size,
766  &null_padded_first_elem,
767  &idx_buf_ptr](
768  const size_t start,
769  const size_t end,
770  const bool is_parallel_modification,
771  std::vector<size_t>* null_padded_row_idx_vec) {
772  for (size_t i = start; i < end; i++) {
773  if (LIKELY(idx_buf_ptr[cur_sum_num_tuples + i] >= 0)) {
774  if (null_padded_first_elem) {
775  // deal with null padded bytes
776  idx_buf_ptr[cur_sum_num_tuples + i] -=
778  }
779  idx_buf_ptr[cur_sum_num_tuples + i] += sum_data_buf_size;
780  } else {
781  // null padded row needs to reference the previous row idx so in
782  // multi-threaded index modification we may suffer from thread
783  // contention when thread-i needs to reference thread-j's row idx so we
784  // collect row idxs for null rows here and deal with them after this
785  // step
786  null_padded_row_idx_vec->push_back(cur_sum_num_tuples + i);
787  }
788  }
789  };
790  if (cur_chunk_num_tuples > g_enable_parallel_linearization) {
791  is_parallel_modification = true;
792  for (auto interval :
793  makeIntervals(size_t(0), cur_chunk_num_tuples, worker_count)) {
794  conversion_threads.push_back(
796  do_work,
797  interval.begin,
798  interval.end,
799  is_parallel_modification,
800  &null_padded_row_idx_vecs[interval.index]));
801  }
802  for (auto& child : conversion_threads) {
803  child.wait();
804  }
805  for (auto& v : null_padded_row_idx_vecs) {
806  std::copy(v.begin(), v.end(), std::back_inserter(null_padded_row_idx_vec));
807  }
808  } else {
809  do_work(size_t(0),
810  cur_chunk_num_tuples,
811  is_parallel_modification,
812  &null_padded_row_idx_vec);
813  }
814  if (!null_padded_row_idx_vec.empty()) {
815  // modify null padded row idxs by referencing the previous row
816  // here we sort row idxs to correctly propagate modified row idxs
817  std::sort(null_padded_row_idx_vec.begin(), null_padded_row_idx_vec.end());
818  for (auto& padded_null_row_idx : null_padded_row_idx_vec) {
819  if (idx_buf_ptr[padded_null_row_idx - 1] > 0) {
820  idx_buf_ptr[padded_null_row_idx] = -idx_buf_ptr[padded_null_row_idx - 1];
821  } else {
822  idx_buf_ptr[padded_null_row_idx] = idx_buf_ptr[padded_null_row_idx - 1];
823  }
824  }
825  }
826  }
827  }
828  cur_sum_num_tuples += cur_chunk_num_tuples;
829  sum_data_buf_size += target_chunk_data_buffer->size();
830  if (target_idx_buf_ptr[*chunk_num_tuple_it] < 0) {
831  null_padded_last_val = true;
832  } else {
833  null_padded_last_val = false;
834  }
835  if (null_padded_first_elem) {
836  sum_data_buf_size -= ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
837  null_padded_first_elem = false; // set for the next chunk
838  }
839  if (!has_cached_merged_idx_buf && cur_sum_num_tuples == total_num_tuples) {
840  auto merged_index_buffer_ptr =
841  reinterpret_cast<ArrayOffsetT*>(merged_index_buffer_in_cpu->getMemoryPtr());
842  merged_index_buffer_ptr[total_num_tuples] =
843  total_data_buf_size -
844  total_idx_size_modifier; // last index value is total data size;
845  }
846  }
847 
848  // put linearized index buffer to per-device cache
849  AbstractBuffer* merged_index_buffer = nullptr;
850  size_t buf_size = total_idx_buf_size + sizeof(ArrayOffsetT);
851  auto copyBuf =
852  [&device_allocator](
853  int8_t* src, int8_t* dest, size_t buf_size, MemoryLevel memory_level) {
854  if (memory_level == Data_Namespace::CPU_LEVEL) {
855  memcpy((void*)dest, src, buf_size);
856  } else {
857  CHECK(memory_level == Data_Namespace::GPU_LEVEL);
858  device_allocator->copyToDevice(dest, src, buf_size);
859  }
860  };
861  {
862  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
863  auto merged_idx_buf_cache_it = linearized_idx_buf_cache_.find(icd);
864  // for CPU execution, we can use `merged_index_buffer_in_cpu` as is
865  // but for GPU, we have to copy it to corresponding device
866  if (memory_level == MemoryLevel::GPU_LEVEL) {
867  if (merged_idx_buf_cache_it != linearized_idx_buf_cache_.end()) {
868  auto& merged_idx_buf_cache = merged_idx_buf_cache_it->second;
869  auto merged_idx_buf_it = merged_idx_buf_cache.find(device_id);
870  if (merged_idx_buf_it != merged_idx_buf_cache.end()) {
871  merged_index_buffer = merged_idx_buf_it->second;
872  } else {
873  merged_index_buffer = cat.getDataMgr().alloc(memory_level, device_id, buf_size);
874  copyBuf(merged_index_buffer_in_cpu->getMemoryPtr(),
875  merged_index_buffer->getMemoryPtr(),
876  buf_size,
877  memory_level);
878  merged_idx_buf_cache.insert(std::make_pair(device_id, merged_index_buffer));
879  }
880  } else {
881  merged_index_buffer = cat.getDataMgr().alloc(memory_level, device_id, buf_size);
882  copyBuf(merged_index_buffer_in_cpu->getMemoryPtr(),
883  merged_index_buffer->getMemoryPtr(),
884  buf_size,
885  memory_level);
887  m.insert(std::make_pair(device_id, merged_index_buffer));
888  linearized_idx_buf_cache_.insert(std::make_pair(icd, m));
889  }
890  } else {
891  // `linearlized_temporary_cpu_index_buf_cache_` has this buf
892  merged_index_buffer = merged_index_buffer_in_cpu;
893  }
894  }
895  CHECK(merged_index_buffer);
896  linearization_time_ms += timer_stop(clock_begin);
897  VLOG(2) << "Linearization has been successfully done, elapsed time: "
898  << linearization_time_ms << " ms.";
899  return {merged_data_buffer, merged_index_buffer};
900 }
std::unordered_map< int, AbstractBuffer * > DeviceMergedChunkMap
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1439
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:243
virtual int8_t * getMemoryPtr()=0
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
Executor * executor_
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:122
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
std::unordered_map< int, AbstractBuffer * > linearlized_temporary_cpu_index_buf_cache_
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
future< Result > async(Fn &&fn, Args &&...args)
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define LIKELY(x)
Definition: likely.h:24
std::mutex linearized_col_cache_mutex_
An AbstractBuffer is a unit of data management for a data manager.
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
int32_t ArrayOffsetT
Definition: sqltypes.h:1229
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
#define CHECK(condition)
Definition: Logger.h:222
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_idx_buf_cache_
int cpu_threads()
Definition: thread_count.h:25
size_t g_enable_parallel_linearization
Definition: Execute.cpp:144
#define VLOG(n)
Definition: Logger.h:316
Type timer_start()
Definition: measure.h:42
AbstractBuffer * alloc(const MemoryLevel memoryLevel, const int deviceId, const size_t numBytes)
Definition: DataMgr.cpp:516

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

JoinColumn ColumnFetcher::makeJoinColumn ( Executor executor,
const Analyzer::ColumnVar hash_col,
const std::vector< Fragmenter_Namespace::FragmentInfo > &  fragments,
const Data_Namespace::MemoryLevel  effective_mem_lvl,
const int  device_id,
DeviceAllocator device_allocator,
const size_t  thread_idx,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner,
std::vector< std::shared_ptr< void >> &  malloc_owner,
ColumnCacheMap column_cache 
)
static

Creates a JoinColumn struct containing an array of JoinChunk structs.

makeJoinColumn() creates a JoinColumn struct containing a array of JoinChunk structs, col_chunks_buff, malloced in CPU memory. Although the col_chunks_buff array is in CPU memory here, each JoinChunk struct contains an int8_t* pointer from getOneColumnFragment(), col_buff, that can point to either CPU memory or GPU memory depending on the effective_mem_lvl parameter. See also the fetchJoinColumn() function where col_chunks_buff is copied into GPU memory if needed. The malloc_owner parameter will have the malloced array appended. The chunks_owner parameter will be appended with the chunks.

Definition at line 155 of file ColumnFetcher.cpp.

References CHECK, CHECK_GT, checked_malloc(), JoinChunk::col_buff, Data_Namespace::CPU_LEVEL, Executor::ERR_INTERRUPTED, g_enable_non_kernel_time_query_interrupt, SQLTypeInfo::get_size(), Analyzer::Expr::get_type_info(), getOneColumnFragment(), and JoinChunk::num_elems.

Referenced by HashJoin::fetchJoinColumn().

165  {
166  CHECK(!fragments.empty());
167 
168  size_t col_chunks_buff_sz = sizeof(struct JoinChunk) * fragments.size();
169  // TODO: needs an allocator owner
170  auto col_chunks_buff = reinterpret_cast<int8_t*>(
171  malloc_owner.emplace_back(checked_malloc(col_chunks_buff_sz), free).get());
172  auto join_chunk_array = reinterpret_cast<struct JoinChunk*>(col_chunks_buff);
173 
174  size_t num_elems = 0;
175  size_t num_chunks = 0;
176  for (auto& frag : fragments) {
178  executor->checkNonKernelTimeInterrupted()) {
180  }
181  auto [col_buff, elem_count] = getOneColumnFragment(
182  executor,
183  hash_col,
184  frag,
185  effective_mem_lvl,
186  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
187  device_allocator,
188  thread_idx,
189  chunks_owner,
190  column_cache);
191  if (col_buff != nullptr) {
192  num_elems += elem_count;
193  join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
194  } else {
195  continue;
196  }
197  ++num_chunks;
198  }
199 
200  int elem_sz = hash_col.get_type_info().get_size();
201  CHECK_GT(elem_sz, 0);
202 
203  return {col_chunks_buff,
204  col_chunks_buff_sz,
205  num_chunks,
206  num_elems,
207  static_cast<size_t>(elem_sz)};
208 }
HOST DEVICE int get_size() const
Definition: sqltypes.h:390
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1439
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
const int8_t * col_buff
#define CHECK_GT(x, y)
Definition: Logger.h:234
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:82
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
#define CHECK(condition)
Definition: Logger.h:222
size_t num_elems

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ChunkIter ColumnFetcher::prepareChunkIter ( AbstractBuffer merged_data_buf,
AbstractBuffer merged_index_buf,
ChunkIter chunk_iter,
bool  is_true_varlen_type,
const size_t  total_num_tuples 
) const
private

Definition at line 1048 of file ColumnFetcher.cpp.

References ChunkIter::current_pos, ChunkIter::end_pos, Data_Namespace::AbstractBuffer::getMemoryPtr(), ChunkIter::num_elems, ChunkIter::second_buf, Data_Namespace::AbstractBuffer::size(), ChunkIter::skip, ChunkIter::skip_size, ChunkIter::start_pos, and ChunkIter::type_info.

Referenced by linearizeColumnFragments().

1052  {
1053  ChunkIter merged_chunk_iter;
1054  if (is_true_varlen_type) {
1055  merged_chunk_iter.start_pos = merged_index_buf->getMemoryPtr();
1056  merged_chunk_iter.current_pos = merged_index_buf->getMemoryPtr();
1057  merged_chunk_iter.end_pos = merged_index_buf->getMemoryPtr() +
1058  merged_index_buf->size() - sizeof(StringOffsetT);
1059  merged_chunk_iter.second_buf = merged_data_buf->getMemoryPtr();
1060  } else {
1061  merged_chunk_iter.start_pos = merged_data_buf->getMemoryPtr();
1062  merged_chunk_iter.current_pos = merged_data_buf->getMemoryPtr();
1063  merged_chunk_iter.end_pos = merged_data_buf->getMemoryPtr() + merged_data_buf->size();
1064  merged_chunk_iter.second_buf = nullptr;
1065  }
1066  merged_chunk_iter.num_elems = total_num_tuples;
1067  merged_chunk_iter.skip = chunk_iter.skip;
1068  merged_chunk_iter.skip_size = chunk_iter.skip_size;
1069  merged_chunk_iter.type_info = chunk_iter.type_info;
1070  return merged_chunk_iter;
1071 }
int8_t * start_pos
Definition: ChunkIter.h:34
int8_t * current_pos
Definition: ChunkIter.h:33
SQLTypeInfo type_info
Definition: ChunkIter.h:31
virtual int8_t * getMemoryPtr()=0
int32_t StringOffsetT
Definition: sqltypes.h:1228
int8_t * end_pos
Definition: ChunkIter.h:35
size_t num_elems
Definition: ChunkIter.h:38
int skip_size
Definition: ChunkIter.h:37
int8_t * second_buf
Definition: ChunkIter.h:32
int skip
Definition: ChunkIter.h:36

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * ColumnFetcher::transferColumnIfNeeded ( const ColumnarResults columnar_results,
const int  col_id,
Data_Namespace::DataMgr data_mgr,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_id,
DeviceAllocator device_allocator 
)
staticprivate

Definition at line 982 of file ColumnFetcher.cpp.

References Allocator::alloc(), CHECK, CHECK_LT, DeviceAllocator::copyToDevice(), FlatBufferManager::getBufferSize(), ColumnarResults::getColumnBuffers(), ColumnarResults::getColumnType(), Data_Namespace::GPU_LEVEL, FlatBufferManager::isFlatBuffer(), and ColumnarResults::size().

Referenced by getAllTableColumnFragments(), getOneColumnFragment(), and getResultSetColumn().

988  {
989  if (!columnar_results) {
990  return nullptr;
991  }
992  const auto& col_buffers = columnar_results->getColumnBuffers();
993  CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
994  if (memory_level == Data_Namespace::GPU_LEVEL) {
995  const auto& col_ti = columnar_results->getColumnType(col_id);
996  size_t num_bytes;
997  if (col_ti.is_array() && FlatBufferManager::isFlatBuffer(col_buffers[col_id])) {
998  num_bytes = FlatBufferManager::getBufferSize(col_buffers[col_id]);
999  } else {
1000  num_bytes = columnar_results->size() * col_ti.get_size();
1001  }
1002  CHECK(device_allocator);
1003  auto gpu_col_buffer = device_allocator->alloc(num_bytes);
1004  device_allocator->copyToDevice(gpu_col_buffer, col_buffers[col_id], num_bytes);
1005  return gpu_col_buffer;
1006  }
1007  return col_buffers[col_id];
1008 }
virtual int8_t * alloc(const size_t num_bytes)=0
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
const size_t size() const
#define CHECK_LT(x, y)
Definition: Logger.h:232
#define CHECK(condition)
Definition: Logger.h:222
HOST static DEVICE bool isFlatBuffer(const void *buffer)
Definition: FlatBuffer.h:186
const std::vector< int8_t * > & getColumnBuffers() const
const SQLTypeInfo & getColumnType(const int col_id) const
static int64_t getBufferSize(const void *buffer)
Definition: FlatBuffer.h:204

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Friends And Related Function Documentation

friend class QueryCompilationDescriptor
friend

Definition at line 200 of file ColumnFetcher.h.

friend class TableFunctionExecutionContext
friend

Definition at line 201 of file ColumnFetcher.h.

Member Data Documentation

std::mutex ColumnFetcher::chunk_list_mutex_
mutableprivate

Definition at line 184 of file ColumnFetcher.h.

Referenced by getOneTableColumnFragment(), and linearizeColumnFragments().

std::mutex ColumnFetcher::columnar_fetch_mutex_
mutableprivate

Definition at line 181 of file ColumnFetcher.h.

Referenced by getAllTableColumnFragments(), and getResultSetColumn().

std::unordered_map<InputColDescriptor, std::unique_ptr<const ColumnarResults> > ColumnFetcher::columnarized_scan_table_cache_
mutableprivate

Definition at line 188 of file ColumnFetcher.h.

Referenced by getAllTableColumnFragments().

ColumnCacheMap ColumnFetcher::columnarized_table_cache_
mutableprivate
std::mutex ColumnFetcher::linearization_mutex_
mutableprivate

Definition at line 183 of file ColumnFetcher.h.

Referenced by linearizeColumnFragments().

std::mutex ColumnFetcher::linearized_col_cache_mutex_
mutableprivate
std::unordered_map<InputColDescriptor, DeviceMergedChunkMap> ColumnFetcher::linearized_data_buf_cache_
mutableprivate
std::unordered_map<InputColDescriptor, DeviceMergedChunkMap> ColumnFetcher::linearized_idx_buf_cache_
mutableprivate

Definition at line 198 of file ColumnFetcher.h.

Referenced by freeLinearizedBuf(), and linearizeVarLenArrayColFrags().

std::unordered_map<InputColDescriptor, DeviceMergedChunkIterMap> ColumnFetcher::linearized_multi_frag_chunk_iter_cache_
mutableprivate

Definition at line 192 of file ColumnFetcher.h.

Referenced by addMergedChunkIter(), getChunkiter(), and linearizeColumnFragments().

std::unordered_map<int, AbstractBuffer*> ColumnFetcher::linearlized_temporary_cpu_index_buf_cache_
mutableprivate
std::mutex ColumnFetcher::varlen_chunk_fetch_mutex_
mutableprivate

Definition at line 182 of file ColumnFetcher.h.

Referenced by getOneTableColumnFragment(), and linearizeColumnFragments().


The documentation for this class was generated from the following files: