OmniSciDB  ca0c39ec8f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnFetcher.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <memory>
20 
23 #include "QueryEngine/Execute.h"
24 #include "Shared/Intervals.h"
25 #include "Shared/likely.h"
26 #include "Shared/sqltypes.h"
27 
28 namespace {
29 
31  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
32  const ResultSetPtr& result,
33  const size_t thread_idx,
34  const size_t executor_id,
35  const int frag_id) {
37  CHECK_EQ(0, frag_id);
38 
39  std::vector<SQLTypeInfo> col_types;
40  for (size_t i = 0; i < result->colCount(); ++i) {
41  col_types.push_back(get_logical_type_info(result->getColType(i)));
42  }
43  return new ColumnarResults(
44  row_set_mem_owner, *result, result->colCount(), col_types, executor_id, thread_idx);
45 }
46 
48  switch (memoryLevel) {
49  case DISK_LEVEL:
50  return "DISK_LEVEL";
51  case GPU_LEVEL:
52  return "GPU_LEVEL";
53  case CPU_LEVEL:
54  return "CPU_LEVEL";
55  default:
56  return "UNKNOWN";
57  }
58 }
59 } // namespace
60 
61 ColumnFetcher::ColumnFetcher(Executor* executor, const ColumnCacheMap& column_cache)
62  : executor_(executor), columnarized_table_cache_(column_cache) {}
63 
67 std::pair<const int8_t*, size_t> ColumnFetcher::getOneColumnFragment(
68  Executor* executor,
69  const Analyzer::ColumnVar& hash_col,
70  const Fragmenter_Namespace::FragmentInfo& fragment,
71  const Data_Namespace::MemoryLevel effective_mem_lvl,
72  const int device_id,
73  DeviceAllocator* device_allocator,
74  const size_t thread_idx,
75  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
76  ColumnCacheMap& column_cache) {
77  static std::mutex columnar_conversion_mutex;
78  auto timer = DEBUG_TIMER(__func__);
79  if (fragment.isEmptyPhysicalFragment()) {
80  return {nullptr, 0};
81  }
82  const auto table_id = hash_col.get_table_id();
83  const auto& catalog = *executor->getCatalog();
84  const auto cd =
85  get_column_descriptor_maybe(hash_col.get_column_id(), table_id, catalog);
86  CHECK(!cd || !(cd->isVirtualCol));
87  const int8_t* col_buff = nullptr;
88  if (cd) { // real table
89  /* chunk_meta_it is used here to retrieve chunk numBytes and
90  numElements. Apparently, their values are often zeros. If we
91  knew how to predict the zero values, calling
92  getChunkMetadataMap could be avoided to skip
93  synthesize_metadata calls. */
94  auto chunk_meta_it = fragment.getChunkMetadataMap().find(hash_col.get_column_id());
95  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
96  ChunkKey chunk_key{catalog.getCurrentDB().dbId,
97  fragment.physicalTableId,
98  hash_col.get_column_id(),
99  fragment.fragmentId};
100  const auto chunk = Chunk_NS::Chunk::getChunk(
101  cd,
102  &catalog.getDataMgr(),
103  chunk_key,
104  effective_mem_lvl,
105  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
106  chunk_meta_it->second->numBytes,
107  chunk_meta_it->second->numElements);
108  chunks_owner.push_back(chunk);
109  CHECK(chunk);
110  auto ab = chunk->getBuffer();
111  CHECK(ab->getMemoryPtr());
112  col_buff = reinterpret_cast<int8_t*>(ab->getMemoryPtr());
113  } else { // temporary table
114  const ColumnarResults* col_frag{nullptr};
115  {
116  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
117  const auto frag_id = fragment.fragmentId;
118  if (column_cache.empty() || !column_cache.count(table_id)) {
119  column_cache.insert(std::make_pair(
120  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
121  }
122  auto& frag_id_to_result = column_cache[table_id];
123  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
124  frag_id_to_result.insert(
125  std::make_pair(frag_id,
126  std::shared_ptr<const ColumnarResults>(columnarize_result(
127  executor->row_set_mem_owner_,
128  get_temporary_table(executor->temporary_tables_, table_id),
129  executor->executor_id_,
130  thread_idx,
131  frag_id))));
132  }
133  col_frag = column_cache[table_id][frag_id].get();
134  }
135  col_buff = transferColumnIfNeeded(
136  col_frag,
137  hash_col.get_column_id(),
138  &catalog.getDataMgr(),
139  effective_mem_lvl,
140  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
141  device_allocator);
142  }
143  return {col_buff, fragment.getNumTuples()};
144 }
145 
156  Executor* executor,
157  const Analyzer::ColumnVar& hash_col,
158  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
159  const Data_Namespace::MemoryLevel effective_mem_lvl,
160  const int device_id,
161  DeviceAllocator* device_allocator,
162  const size_t thread_idx,
163  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
164  std::vector<std::shared_ptr<void>>& malloc_owner,
165  ColumnCacheMap& column_cache) {
166  CHECK(!fragments.empty());
167 
168  size_t col_chunks_buff_sz = sizeof(struct JoinChunk) * fragments.size();
169  // TODO: needs an allocator owner
170  auto col_chunks_buff = reinterpret_cast<int8_t*>(
171  malloc_owner.emplace_back(checked_malloc(col_chunks_buff_sz), free).get());
172  auto join_chunk_array = reinterpret_cast<struct JoinChunk*>(col_chunks_buff);
173 
174  size_t num_elems = 0;
175  size_t num_chunks = 0;
176  for (auto& frag : fragments) {
178  executor->checkNonKernelTimeInterrupted()) {
180  }
181  auto [col_buff, elem_count] = getOneColumnFragment(
182  executor,
183  hash_col,
184  frag,
185  effective_mem_lvl,
186  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
187  device_allocator,
188  thread_idx,
189  chunks_owner,
190  column_cache);
191  if (col_buff != nullptr) {
192  num_elems += elem_count;
193  join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
194  } else {
195  continue;
196  }
197  ++num_chunks;
198  }
199 
200  int elem_sz = hash_col.get_type_info().get_size();
201  CHECK_GT(elem_sz, 0);
202 
203  return {col_chunks_buff,
204  col_chunks_buff_sz,
205  num_chunks,
206  num_elems,
207  static_cast<size_t>(elem_sz)};
208 }
209 
211  const int table_id,
212  const int frag_id,
213  const int col_id,
214  const std::map<int, const TableFragments*>& all_tables_fragments,
215  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
216  std::list<ChunkIter>& chunk_iter_holder,
217  const Data_Namespace::MemoryLevel memory_level,
218  const int device_id,
219  DeviceAllocator* allocator) const {
220  const auto fragments_it = all_tables_fragments.find(table_id);
221  CHECK(fragments_it != all_tables_fragments.end());
222  const auto fragments = fragments_it->second;
223  const auto& fragment = (*fragments)[frag_id];
224  if (fragment.isEmptyPhysicalFragment()) {
225  return nullptr;
226  }
227  std::shared_ptr<Chunk_NS::Chunk> chunk;
228  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
229  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
230  CHECK(table_id > 0);
231  const auto& cat = *executor_->getCatalog();
232  auto cd = get_column_descriptor(col_id, table_id, cat);
233  CHECK(cd);
234  const auto col_type =
235  get_column_type(col_id, table_id, cd, executor_->temporary_tables_);
236  const bool is_real_string =
237  col_type.is_string() && col_type.get_compression() == kENCODING_NONE;
238  const bool is_varlen =
239  is_real_string ||
240  col_type.is_array(); // TODO: should it be col_type.is_varlen_array() ?
241  {
242  ChunkKey chunk_key{
243  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
244  std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
245  if (is_varlen) {
246  varlen_chunk_lock.reset(new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
247  }
249  cd,
250  &cat.getDataMgr(),
251  chunk_key,
252  memory_level,
253  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id,
254  chunk_meta_it->second->numBytes,
255  chunk_meta_it->second->numElements);
256  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
257  chunk_holder.push_back(chunk);
258  }
259  if (is_varlen) {
260  CHECK_GT(table_id, 0);
261  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
262  chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
263  auto& chunk_iter = chunk_iter_holder.back();
264  if (memory_level == Data_Namespace::CPU_LEVEL) {
265  return reinterpret_cast<int8_t*>(&chunk_iter);
266  } else {
267  auto ab = chunk->getBuffer();
268  ab->pin();
269  auto& row_set_mem_owner = executor_->getRowSetMemoryOwner();
270  row_set_mem_owner->addVarlenInputBuffer(ab);
271  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
272  CHECK(allocator);
273  auto chunk_iter_gpu = allocator->alloc(sizeof(ChunkIter));
274  allocator->copyToDevice(
275  chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter), sizeof(ChunkIter));
276  return chunk_iter_gpu;
277  }
278  } else {
279  auto ab = chunk->getBuffer();
280  CHECK(ab->getMemoryPtr());
281  return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
282  }
283 }
284 
286  const int table_id,
287  const int col_id,
288  const std::map<int, const TableFragments*>& all_tables_fragments,
289  const Data_Namespace::MemoryLevel memory_level,
290  const int device_id,
291  DeviceAllocator* device_allocator,
292  const size_t thread_idx) const {
293  const auto fragments_it = all_tables_fragments.find(table_id);
294  CHECK(fragments_it != all_tables_fragments.end());
295  const auto fragments = fragments_it->second;
296  const auto frag_count = fragments->size();
297  std::vector<std::unique_ptr<ColumnarResults>> column_frags;
298  const ColumnarResults* table_column = nullptr;
299  const InputColDescriptor col_desc(col_id, table_id, int(0));
301  {
302  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
303  auto column_it = columnarized_scan_table_cache_.find(col_desc);
304  if (column_it == columnarized_scan_table_cache_.end()) {
305  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
307  executor_->checkNonKernelTimeInterrupted()) {
309  }
310  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
311  std::list<ChunkIter> chunk_iter_holder;
312  const auto& fragment = (*fragments)[frag_id];
313  if (fragment.isEmptyPhysicalFragment()) {
314  continue;
315  }
316  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
317  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
318  auto col_buffer = getOneTableColumnFragment(table_id,
319  static_cast<int>(frag_id),
320  col_id,
321  all_tables_fragments,
322  chunk_holder,
323  chunk_iter_holder,
325  int(0),
326  device_allocator);
327  column_frags.push_back(
328  std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
329  col_buffer,
330  fragment.getNumTuples(),
331  chunk_meta_it->second->sqlType,
332  executor_->executor_id_,
333  thread_idx));
334  }
335  auto merged_results =
336  ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
337  table_column = merged_results.get();
338  columnarized_scan_table_cache_.emplace(col_desc, std::move(merged_results));
339  } else {
340  table_column = column_it->second.get();
341  }
342  }
343  return ColumnFetcher::transferColumnIfNeeded(table_column,
344  0,
345  executor_->getDataMgr(),
346  memory_level,
347  device_id,
348  device_allocator);
349 }
350 
352  const InputColDescriptor* col_desc,
353  const Data_Namespace::MemoryLevel memory_level,
354  const int device_id,
355  DeviceAllocator* device_allocator,
356  const size_t thread_idx) const {
357  CHECK(col_desc);
358  const auto table_id = col_desc->getScanDesc().getTableId();
359  return getResultSetColumn(get_temporary_table(executor_->temporary_tables_, table_id),
360  table_id,
361  col_desc->getColId(),
362  memory_level,
363  device_id,
364  device_allocator,
365  thread_idx);
366 }
367 
369  const int table_id,
370  const int col_id,
371  const std::map<int, const TableFragments*>& all_tables_fragments,
372  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
373  std::list<ChunkIter>& chunk_iter_holder,
374  const Data_Namespace::MemoryLevel memory_level,
375  const int device_id,
376  DeviceAllocator* device_allocator,
377  const size_t thread_idx) const {
378  auto timer = DEBUG_TIMER(__func__);
379  const auto fragments_it = all_tables_fragments.find(table_id);
380  CHECK(fragments_it != all_tables_fragments.end());
381  const auto fragments = fragments_it->second;
382  const auto frag_count = fragments->size();
383  const InputColDescriptor col_desc(col_id, table_id, int(0));
384  const auto& cat = *executor_->getCatalog();
385  auto cd = get_column_descriptor(col_id, table_id, cat);
386  CHECK(cd);
388  CHECK_GT(table_id, 0);
389  bool is_varlen_chunk = cd->columnType.is_varlen() && !cd->columnType.is_fixlen_array();
390  size_t total_num_tuples = 0;
391  size_t total_data_buf_size = 0;
392  size_t total_idx_buf_size = 0;
393  {
394  std::lock_guard<std::mutex> linearize_guard(linearized_col_cache_mutex_);
395  auto linearized_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
396  if (linearized_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
397  if (memory_level == CPU_LEVEL) {
398  // in CPU execution, each kernel can share merged chunk since they operates in the
399  // same memory space, so we can share the same chunk iter among kernels
400  return getChunkiter(col_desc, 0);
401  } else {
402  // in GPU execution, this becomes the matter when we deploy multi-GPUs
403  // so we only share the chunk_iter iff kernels are launched on the same GPU device
404  // otherwise we need to separately load merged chunk and its iter
405  // todo(yoonmin): D2D copy of merged chunk and its iter?
406  if (linearized_iter_it->second.find(device_id) !=
407  linearized_iter_it->second.end()) {
408  // note that cached chunk_iter is located on CPU memory space...
409  // we just need to copy it to each device
410  // chunk_iter already contains correct buffer addr depending on execution device
411  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
412  device_allocator->copyToDevice(
413  chunk_iter_gpu, getChunkiter(col_desc, device_id), sizeof(ChunkIter));
414  return chunk_iter_gpu;
415  }
416  }
417  }
418  }
419 
420  // collect target fragments
421  // basically we load chunk in CPU first, and do necessary manipulation
422  // to make semantics of a merged chunk correctly
423  std::shared_ptr<Chunk_NS::Chunk> chunk;
424  std::list<std::shared_ptr<Chunk_NS::Chunk>> local_chunk_holder;
425  std::list<ChunkIter> local_chunk_iter_holder;
426  std::list<size_t> local_chunk_num_tuples;
427  {
428  std::lock_guard<std::mutex> linearize_guard(varlen_chunk_fetch_mutex_);
429  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
430  const auto& fragment = (*fragments)[frag_id];
431  if (fragment.isEmptyPhysicalFragment()) {
432  continue;
433  }
434  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
435  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
436  ChunkKey chunk_key{
437  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
438  chunk = Chunk_NS::Chunk::getChunk(cd,
439  &cat.getDataMgr(),
440  chunk_key,
442  0,
443  chunk_meta_it->second->numBytes,
444  chunk_meta_it->second->numElements);
445  local_chunk_holder.push_back(chunk);
446  auto chunk_iter = chunk->begin_iterator(chunk_meta_it->second);
447  local_chunk_iter_holder.push_back(chunk_iter);
448  local_chunk_num_tuples.push_back(fragment.getNumTuples());
449  total_num_tuples += fragment.getNumTuples();
450  total_data_buf_size += chunk->getBuffer()->size();
451  std::ostringstream oss;
452  oss << "Load chunk for col_name: " << chunk->getColumnDesc()->columnName
453  << ", col_id: " << chunk->getColumnDesc()->columnId << ", Frag-" << frag_id
454  << ", numTuples: " << fragment.getNumTuples()
455  << ", data_size: " << chunk->getBuffer()->size();
456  if (chunk->getIndexBuf()) {
457  auto idx_buf_size = chunk->getIndexBuf()->size() - sizeof(ArrayOffsetT);
458  oss << ", index_size: " << idx_buf_size;
459  total_idx_buf_size += idx_buf_size;
460  }
461  VLOG(2) << oss.str();
462  }
463  }
464 
465  auto& col_ti = cd->columnType;
466  MergedChunk res{nullptr, nullptr};
467  // Do linearize multi-fragmented column depending on column type
468  // We cover array and non-encoded text columns
469  // Note that geo column is actually organized as a set of arrays
470  // and each geo object has different set of vectors that they require
471  // Here, we linearize each array at a time, so eventually the geo object has a set of
472  // "linearized" arrays
473  {
474  std::lock_guard<std::mutex> linearization_guard(linearization_mutex_);
475  if (col_ti.is_array()) {
476  if (col_ti.is_fixlen_array()) {
477  VLOG(2) << "Linearize fixed-length multi-frag array column (col_id: "
478  << cd->columnId << ", col_name: " << cd->columnName
479  << ", device_type: " << getMemoryLevelString(memory_level)
480  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
482  chunk_holder,
483  chunk_iter_holder,
484  local_chunk_holder,
485  local_chunk_iter_holder,
486  local_chunk_num_tuples,
487  memory_level,
488  cd,
489  device_id,
490  total_data_buf_size,
491  total_idx_buf_size,
492  total_num_tuples,
493  device_allocator,
494  thread_idx);
495  } else {
496  CHECK(col_ti.is_varlen_array());
497  VLOG(2) << "Linearize variable-length multi-frag array column (col_id: "
498  << cd->columnId << ", col_name: " << cd->columnName
499  << ", device_type: " << getMemoryLevelString(memory_level)
500  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
502  chunk_holder,
503  chunk_iter_holder,
504  local_chunk_holder,
505  local_chunk_iter_holder,
506  local_chunk_num_tuples,
507  memory_level,
508  cd,
509  device_id,
510  total_data_buf_size,
511  total_idx_buf_size,
512  total_num_tuples,
513  device_allocator,
514  thread_idx);
515  }
516  }
517  if (col_ti.is_string() && !col_ti.is_dict_encoded_string()) {
518  VLOG(2) << "Linearize variable-length multi-frag non-encoded text column (col_id: "
519  << cd->columnId << ", col_name: " << cd->columnName
520  << ", device_type: " << getMemoryLevelString(memory_level)
521  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
523  chunk_holder,
524  chunk_iter_holder,
525  local_chunk_holder,
526  local_chunk_iter_holder,
527  local_chunk_num_tuples,
528  memory_level,
529  cd,
530  device_id,
531  total_data_buf_size,
532  total_idx_buf_size,
533  total_num_tuples,
534  device_allocator,
535  thread_idx);
536  }
537  }
538  CHECK(res.first); // check merged data buffer
539  if (!col_ti.is_fixlen_array()) {
540  CHECK(res.second); // check merged index buffer
541  }
542  auto merged_data_buffer = res.first;
543  auto merged_index_buffer = res.second;
544 
545  // prepare ChunkIter for the linearized chunk
546  auto merged_chunk = std::make_shared<Chunk_NS::Chunk>(
547  merged_data_buffer, merged_index_buffer, cd, false);
548  // to prepare chunk_iter for the merged chunk, we pass one of local chunk iter
549  // to fill necessary metadata that is a common for all merged chunks
550  auto merged_chunk_iter = prepareChunkIter(merged_data_buffer,
551  merged_index_buffer,
552  *(local_chunk_iter_holder.rbegin()),
553  is_varlen_chunk,
554  total_num_tuples);
555  {
556  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
557  chunk_holder.push_back(merged_chunk);
558  chunk_iter_holder.push_back(merged_chunk_iter);
559  }
560 
561  auto merged_chunk_iter_ptr = reinterpret_cast<int8_t*>(&(chunk_iter_holder.back()));
562  if (memory_level == MemoryLevel::CPU_LEVEL) {
563  addMergedChunkIter(col_desc, 0, merged_chunk_iter_ptr);
564  return merged_chunk_iter_ptr;
565  } else {
566  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
567  CHECK(device_allocator);
568  addMergedChunkIter(col_desc, device_id, merged_chunk_iter_ptr);
569  // note that merged_chunk_iter_ptr resides in CPU memory space
570  // having its content aware GPU buffer that we alloc. for merging
571  // so we need to copy this chunk_iter to each device explicitly
572  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
573  device_allocator->copyToDevice(
574  chunk_iter_gpu, merged_chunk_iter_ptr, sizeof(ChunkIter));
575  return chunk_iter_gpu;
576  }
577 }
578 
581  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
582  std::list<ChunkIter>& chunk_iter_holder,
583  std::list<std::shared_ptr<Chunk_NS::Chunk>>& local_chunk_holder,
584  std::list<ChunkIter>& local_chunk_iter_holder,
585  std::list<size_t>& local_chunk_num_tuples,
586  MemoryLevel memory_level,
587  const ColumnDescriptor* cd,
588  const int device_id,
589  const size_t total_data_buf_size,
590  const size_t total_idx_buf_size,
591  const size_t total_num_tuples,
592  DeviceAllocator* device_allocator,
593  const size_t thread_idx) const {
594  // for linearization of varlen col we have to deal with not only data buffer
595  // but also its underlying index buffer which is responsible for offset of varlen value
596  // basically we maintain per-device linearized (data/index) buffer
597  // for data buffer, we linearize varlen col's chunks within a device-specific buffer
598  // by just appending each chunk
599  // for index buffer, we need to not only appending each chunk but modify the offset
600  // value to affect various conditions like nullness, padding and so on so we first
601  // append index buffer in CPU, manipulate it as we required and then copy it to specific
602  // device if necessary (for GPU execution)
603  AbstractBuffer* merged_index_buffer_in_cpu = nullptr;
604  AbstractBuffer* merged_data_buffer = nullptr;
605  bool has_cached_merged_idx_buf = false;
606  bool has_cached_merged_data_buf = false;
607  const InputColDescriptor icd(cd->columnId, cd->tableId, int(0));
608  // check linearized buffer's cache first
609  // if not exists, alloc necessary buffer space to prepare linearization
610  int64_t linearization_time_ms = 0;
611  auto clock_begin = timer_start();
612  {
613  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
614  auto cached_data_buf_cache_it = linearized_data_buf_cache_.find(icd);
615  if (cached_data_buf_cache_it != linearized_data_buf_cache_.end()) {
616  auto& cd_cache = cached_data_buf_cache_it->second;
617  auto cached_data_buf_it = cd_cache.find(device_id);
618  if (cached_data_buf_it != cd_cache.end()) {
619  has_cached_merged_data_buf = true;
620  merged_data_buffer = cached_data_buf_it->second;
621  VLOG(2) << "Recycle merged data buffer for linearized chunks (memory_level: "
622  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
623  << ")";
624  } else {
625  merged_data_buffer =
626  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
627  VLOG(2) << "Allocate " << total_data_buf_size
628  << " bytes of data buffer space for linearized chunks (memory_level: "
629  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
630  << ")";
631  cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
632  }
633  } else {
635  merged_data_buffer =
636  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
637  VLOG(2) << "Allocate " << total_data_buf_size
638  << " bytes of data buffer space for linearized chunks (memory_level: "
639  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
640  << ")";
641  m.insert(std::make_pair(device_id, merged_data_buffer));
642  linearized_data_buf_cache_.insert(std::make_pair(icd, m));
643  }
644 
645  auto cached_index_buf_it =
647  if (cached_index_buf_it != linearlized_temporary_cpu_index_buf_cache_.end()) {
648  has_cached_merged_idx_buf = true;
649  merged_index_buffer_in_cpu = cached_index_buf_it->second;
650  VLOG(2)
651  << "Recycle merged temporary idx buffer for linearized chunks (memory_level: "
652  << getMemoryLevelString(memory_level) << ", device_id: " << device_id << ")";
653  } else {
654  auto idx_buf_size = total_idx_buf_size + sizeof(ArrayOffsetT);
655  merged_index_buffer_in_cpu =
656  cat.getDataMgr().alloc(Data_Namespace::CPU_LEVEL, 0, idx_buf_size);
657  VLOG(2) << "Allocate " << idx_buf_size
658  << " bytes of temporary idx buffer space on CPU for linearized chunks";
659  // just copy the buf addr since we access it via the pointer itself
661  std::make_pair(cd->columnId, merged_index_buffer_in_cpu));
662  }
663  }
664 
665  // linearize buffers if we don't have corresponding buf in cache
666  size_t sum_data_buf_size = 0;
667  size_t cur_sum_num_tuples = 0;
668  size_t total_idx_size_modifier = 0;
669  auto chunk_holder_it = local_chunk_holder.begin();
670  auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
671  auto chunk_num_tuple_it = local_chunk_num_tuples.begin();
672  bool null_padded_first_elem = false;
673  bool null_padded_last_val = false;
674  // before entering the actual linearization part, we first need to check
675  // the overflow case where the sum of index offset becomes larger than 2GB
676  // which currently incurs incorrect query result due to negative array offset
677  // note that we can separate this from the main linearization logic b/c
678  // we just need to see few last elems
679  // todo (yoonmin) : relax this to support larger chunk size (>2GB)
680  for (; chunk_holder_it != local_chunk_holder.end();
681  chunk_holder_it++, chunk_num_tuple_it++) {
682  // check the offset overflow based on the last "valid" offset for each chunk
683  auto target_chunk = chunk_holder_it->get();
684  auto target_chunk_data_buffer = target_chunk->getBuffer();
685  auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
686  auto target_idx_buf_ptr =
687  reinterpret_cast<ArrayOffsetT*>(target_chunk_idx_buffer->getMemoryPtr());
688  auto cur_chunk_num_tuples = *chunk_num_tuple_it;
689  ArrayOffsetT original_offset = -1;
690  size_t cur_idx = cur_chunk_num_tuples;
691  // find the valid (e.g., non-null) offset starting from the last elem
692  while (original_offset < 0) {
693  original_offset = target_idx_buf_ptr[--cur_idx];
694  }
695  ArrayOffsetT new_offset = original_offset + sum_data_buf_size;
696  if (new_offset < 0) {
697  throw std::runtime_error(
698  "Linearization of a variable-length column having chunk size larger than 2GB "
699  "not supported yet");
700  }
701  sum_data_buf_size += target_chunk_data_buffer->size();
702  }
703  chunk_holder_it = local_chunk_holder.begin();
704  chunk_num_tuple_it = local_chunk_num_tuples.begin();
705  sum_data_buf_size = 0;
706 
707  for (; chunk_holder_it != local_chunk_holder.end();
708  chunk_holder_it++, chunk_iter_holder_it++, chunk_num_tuple_it++) {
710  executor_->checkNonKernelTimeInterrupted()) {
712  }
713  auto target_chunk = chunk_holder_it->get();
714  auto target_chunk_data_buffer = target_chunk->getBuffer();
715  auto cur_chunk_num_tuples = *chunk_num_tuple_it;
716  auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
717  auto target_idx_buf_ptr =
718  reinterpret_cast<ArrayOffsetT*>(target_chunk_idx_buffer->getMemoryPtr());
719  auto idx_buf_size = target_chunk_idx_buffer->size() - sizeof(ArrayOffsetT);
720  auto target_data_buffer_start_ptr = target_chunk_data_buffer->getMemoryPtr();
721  auto target_data_buffer_size = target_chunk_data_buffer->size();
722 
723  // when linearizing idx buffers, we need to consider the following cases
724  // 1. the first idx val is padded (a. null / b. empty varlen arr / c. 1-byte size
725  // varlen arr, i.e., {1})
726  // 2. the last idx val is null
727  // 3. null value(s) is/are located in a middle of idx buf <-- we don't need to care
728  if (cur_sum_num_tuples > 0 && target_idx_buf_ptr[0] > 0) {
729  null_padded_first_elem = true;
730  target_data_buffer_start_ptr += ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
731  target_data_buffer_size -= ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
732  total_idx_size_modifier += ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
733  }
734  // we linearize data_buf in device-specific buffer
735  if (!has_cached_merged_data_buf) {
736  merged_data_buffer->append(target_data_buffer_start_ptr,
737  target_data_buffer_size,
739  device_id);
740  }
741 
742  if (!has_cached_merged_idx_buf) {
743  // linearize idx buf in CPU first
744  merged_index_buffer_in_cpu->append(target_chunk_idx_buffer->getMemoryPtr(),
745  idx_buf_size,
747  0); // merged_index_buffer_in_cpu resides in CPU
748  auto idx_buf_ptr =
749  reinterpret_cast<ArrayOffsetT*>(merged_index_buffer_in_cpu->getMemoryPtr());
750  // here, we do not need to manipulate the very first idx buf, just let it as is
751  // and modify otherwise (i.e., starting from second chunk idx buf)
752  if (cur_sum_num_tuples > 0) {
753  if (null_padded_last_val) {
754  // case 2. the previous chunk's last index val is null so we need to set this
755  // chunk's first val to be null
756  idx_buf_ptr[cur_sum_num_tuples] = -sum_data_buf_size;
757  }
758  const size_t worker_count = cpu_threads();
759  std::vector<std::future<void>> conversion_threads;
760  std::vector<std::vector<size_t>> null_padded_row_idx_vecs(worker_count,
761  std::vector<size_t>());
762  bool is_parallel_modification = false;
763  std::vector<size_t> null_padded_row_idx_vec;
764  const auto do_work = [&cur_sum_num_tuples,
765  &sum_data_buf_size,
766  &null_padded_first_elem,
767  &idx_buf_ptr](
768  const size_t start,
769  const size_t end,
770  const bool is_parallel_modification,
771  std::vector<size_t>* null_padded_row_idx_vec) {
772  for (size_t i = start; i < end; i++) {
773  if (LIKELY(idx_buf_ptr[cur_sum_num_tuples + i] >= 0)) {
774  if (null_padded_first_elem) {
775  // deal with null padded bytes
776  idx_buf_ptr[cur_sum_num_tuples + i] -=
778  }
779  idx_buf_ptr[cur_sum_num_tuples + i] += sum_data_buf_size;
780  } else {
781  // null padded row needs to reference the previous row idx so in
782  // multi-threaded index modification we may suffer from thread
783  // contention when thread-i needs to reference thread-j's row idx so we
784  // collect row idxs for null rows here and deal with them after this
785  // step
786  null_padded_row_idx_vec->push_back(cur_sum_num_tuples + i);
787  }
788  }
789  };
790  if (cur_chunk_num_tuples > g_enable_parallel_linearization) {
791  is_parallel_modification = true;
792  for (auto interval :
793  makeIntervals(size_t(0), cur_chunk_num_tuples, worker_count)) {
794  conversion_threads.push_back(
796  do_work,
797  interval.begin,
798  interval.end,
799  is_parallel_modification,
800  &null_padded_row_idx_vecs[interval.index]));
801  }
802  for (auto& child : conversion_threads) {
803  child.wait();
804  }
805  for (auto& v : null_padded_row_idx_vecs) {
806  std::copy(v.begin(), v.end(), std::back_inserter(null_padded_row_idx_vec));
807  }
808  } else {
809  do_work(size_t(0),
810  cur_chunk_num_tuples,
811  is_parallel_modification,
812  &null_padded_row_idx_vec);
813  }
814  if (!null_padded_row_idx_vec.empty()) {
815  // modify null padded row idxs by referencing the previous row
816  // here we sort row idxs to correctly propagate modified row idxs
817  std::sort(null_padded_row_idx_vec.begin(), null_padded_row_idx_vec.end());
818  for (auto& padded_null_row_idx : null_padded_row_idx_vec) {
819  if (idx_buf_ptr[padded_null_row_idx - 1] > 0) {
820  idx_buf_ptr[padded_null_row_idx] = -idx_buf_ptr[padded_null_row_idx - 1];
821  } else {
822  idx_buf_ptr[padded_null_row_idx] = idx_buf_ptr[padded_null_row_idx - 1];
823  }
824  }
825  }
826  }
827  }
828  cur_sum_num_tuples += cur_chunk_num_tuples;
829  sum_data_buf_size += target_chunk_data_buffer->size();
830  if (target_idx_buf_ptr[*chunk_num_tuple_it] < 0) {
831  null_padded_last_val = true;
832  } else {
833  null_padded_last_val = false;
834  }
835  if (null_padded_first_elem) {
836  sum_data_buf_size -= ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
837  null_padded_first_elem = false; // set for the next chunk
838  }
839  if (!has_cached_merged_idx_buf && cur_sum_num_tuples == total_num_tuples) {
840  auto merged_index_buffer_ptr =
841  reinterpret_cast<ArrayOffsetT*>(merged_index_buffer_in_cpu->getMemoryPtr());
842  merged_index_buffer_ptr[total_num_tuples] =
843  total_data_buf_size -
844  total_idx_size_modifier; // last index value is total data size;
845  }
846  }
847 
848  // put linearized index buffer to per-device cache
849  AbstractBuffer* merged_index_buffer = nullptr;
850  size_t buf_size = total_idx_buf_size + sizeof(ArrayOffsetT);
851  auto copyBuf =
852  [&device_allocator](
853  int8_t* src, int8_t* dest, size_t buf_size, MemoryLevel memory_level) {
854  if (memory_level == Data_Namespace::CPU_LEVEL) {
855  memcpy((void*)dest, src, buf_size);
856  } else {
857  CHECK(memory_level == Data_Namespace::GPU_LEVEL);
858  device_allocator->copyToDevice(dest, src, buf_size);
859  }
860  };
861  {
862  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
863  auto merged_idx_buf_cache_it = linearized_idx_buf_cache_.find(icd);
864  // for CPU execution, we can use `merged_index_buffer_in_cpu` as is
865  // but for GPU, we have to copy it to corresponding device
866  if (memory_level == MemoryLevel::GPU_LEVEL) {
867  if (merged_idx_buf_cache_it != linearized_idx_buf_cache_.end()) {
868  auto& merged_idx_buf_cache = merged_idx_buf_cache_it->second;
869  auto merged_idx_buf_it = merged_idx_buf_cache.find(device_id);
870  if (merged_idx_buf_it != merged_idx_buf_cache.end()) {
871  merged_index_buffer = merged_idx_buf_it->second;
872  } else {
873  merged_index_buffer = cat.getDataMgr().alloc(memory_level, device_id, buf_size);
874  copyBuf(merged_index_buffer_in_cpu->getMemoryPtr(),
875  merged_index_buffer->getMemoryPtr(),
876  buf_size,
877  memory_level);
878  merged_idx_buf_cache.insert(std::make_pair(device_id, merged_index_buffer));
879  }
880  } else {
881  merged_index_buffer = cat.getDataMgr().alloc(memory_level, device_id, buf_size);
882  copyBuf(merged_index_buffer_in_cpu->getMemoryPtr(),
883  merged_index_buffer->getMemoryPtr(),
884  buf_size,
885  memory_level);
887  m.insert(std::make_pair(device_id, merged_index_buffer));
888  linearized_idx_buf_cache_.insert(std::make_pair(icd, m));
889  }
890  } else {
891  // `linearlized_temporary_cpu_index_buf_cache_` has this buf
892  merged_index_buffer = merged_index_buffer_in_cpu;
893  }
894  }
895  CHECK(merged_index_buffer);
896  linearization_time_ms += timer_stop(clock_begin);
897  VLOG(2) << "Linearization has been successfully done, elapsed time: "
898  << linearization_time_ms << " ms.";
899  return {merged_data_buffer, merged_index_buffer};
900 }
901 
904  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
905  std::list<ChunkIter>& chunk_iter_holder,
906  std::list<std::shared_ptr<Chunk_NS::Chunk>>& local_chunk_holder,
907  std::list<ChunkIter>& local_chunk_iter_holder,
908  std::list<size_t>& local_chunk_num_tuples,
909  MemoryLevel memory_level,
910  const ColumnDescriptor* cd,
911  const int device_id,
912  const size_t total_data_buf_size,
913  const size_t total_idx_buf_size,
914  const size_t total_num_tuples,
915  DeviceAllocator* device_allocator,
916  const size_t thread_idx) const {
917  int64_t linearization_time_ms = 0;
918  auto clock_begin = timer_start();
919  // linearize collected fragments
920  AbstractBuffer* merged_data_buffer = nullptr;
921  bool has_cached_merged_data_buf = false;
922  const InputColDescriptor icd(cd->columnId, cd->tableId, int(0));
923  {
924  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
925  auto cached_data_buf_cache_it = linearized_data_buf_cache_.find(icd);
926  if (cached_data_buf_cache_it != linearized_data_buf_cache_.end()) {
927  auto& cd_cache = cached_data_buf_cache_it->second;
928  auto cached_data_buf_it = cd_cache.find(device_id);
929  if (cached_data_buf_it != cd_cache.end()) {
930  has_cached_merged_data_buf = true;
931  merged_data_buffer = cached_data_buf_it->second;
932  VLOG(2) << "Recycle merged data buffer for linearized chunks (memory_level: "
933  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
934  << ")";
935  } else {
936  merged_data_buffer =
937  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
938  VLOG(2) << "Allocate " << total_data_buf_size
939  << " bytes of data buffer space for linearized chunks (memory_level: "
940  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
941  << ")";
942  cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
943  }
944  } else {
946  merged_data_buffer =
947  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
948  VLOG(2) << "Allocate " << total_data_buf_size
949  << " bytes of data buffer space for linearized chunks (memory_level: "
950  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
951  << ")";
952  m.insert(std::make_pair(device_id, merged_data_buffer));
953  linearized_data_buf_cache_.insert(std::make_pair(icd, m));
954  }
955  }
956  if (!has_cached_merged_data_buf) {
957  size_t sum_data_buf_size = 0;
958  auto chunk_holder_it = local_chunk_holder.begin();
959  auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
960  for (; chunk_holder_it != local_chunk_holder.end();
961  chunk_holder_it++, chunk_iter_holder_it++) {
964  }
965  auto target_chunk = chunk_holder_it->get();
966  auto target_chunk_data_buffer = target_chunk->getBuffer();
967  merged_data_buffer->append(target_chunk_data_buffer->getMemoryPtr(),
968  target_chunk_data_buffer->size(),
970  device_id);
971  sum_data_buf_size += target_chunk_data_buffer->size();
972  }
973  // check whether each chunk's data buffer is clean under chunk merging
974  CHECK_EQ(total_data_buf_size, sum_data_buf_size);
975  }
976  linearization_time_ms += timer_stop(clock_begin);
977  VLOG(2) << "Linearization has been successfully done, elapsed time: "
978  << linearization_time_ms << " ms.";
979  return {merged_data_buffer, nullptr};
980 }
981 
983  const ColumnarResults* columnar_results,
984  const int col_id,
985  Data_Namespace::DataMgr* data_mgr,
986  const Data_Namespace::MemoryLevel memory_level,
987  const int device_id,
988  DeviceAllocator* device_allocator) {
989  if (!columnar_results) {
990  return nullptr;
991  }
992  const auto& col_buffers = columnar_results->getColumnBuffers();
993  CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
994  if (memory_level == Data_Namespace::GPU_LEVEL) {
995  const auto& col_ti = columnar_results->getColumnType(col_id);
996  const auto num_bytes = columnar_results->size() * col_ti.get_size();
997  CHECK(device_allocator);
998  auto gpu_col_buffer = device_allocator->alloc(num_bytes);
999  device_allocator->copyToDevice(gpu_col_buffer, col_buffers[col_id], num_bytes);
1000  return gpu_col_buffer;
1001  }
1002  return col_buffers[col_id];
1003 }
1004 
1006  const int device_id,
1007  int8_t* chunk_iter_ptr) const {
1008  std::lock_guard<std::mutex> linearize_guard(linearized_col_cache_mutex_);
1009  auto chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
1010  if (chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
1011  auto iter_device_it = chunk_iter_it->second.find(device_id);
1012  if (iter_device_it == chunk_iter_it->second.end()) {
1013  VLOG(2) << "Additional merged chunk_iter for col_desc (tbl: "
1014  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
1015  << "), device_id: " << device_id;
1016  chunk_iter_it->second.emplace(device_id, chunk_iter_ptr);
1017  }
1018  } else {
1019  DeviceMergedChunkIterMap iter_m;
1020  iter_m.emplace(device_id, chunk_iter_ptr);
1021  VLOG(2) << "New merged chunk_iter for col_desc (tbl: "
1022  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
1023  << "), device_id: " << device_id;
1024  linearized_multi_frag_chunk_iter_cache_.emplace(col_desc, iter_m);
1025  }
1026 }
1027 
1029  const int device_id) const {
1030  auto linearized_chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
1031  if (linearized_chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
1032  auto dev_iter_map_it = linearized_chunk_iter_it->second.find(device_id);
1033  if (dev_iter_map_it != linearized_chunk_iter_it->second.end()) {
1034  VLOG(2) << "Recycle merged chunk_iter for col_desc (tbl: "
1035  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
1036  << "), device_id: " << device_id;
1037  return dev_iter_map_it->second;
1038  }
1039  }
1040  return nullptr;
1041 }
1042 
1044  AbstractBuffer* merged_index_buf,
1045  ChunkIter& chunk_iter,
1046  bool is_true_varlen_type,
1047  const size_t total_num_tuples) const {
1048  ChunkIter merged_chunk_iter;
1049  if (is_true_varlen_type) {
1050  merged_chunk_iter.start_pos = merged_index_buf->getMemoryPtr();
1051  merged_chunk_iter.current_pos = merged_index_buf->getMemoryPtr();
1052  merged_chunk_iter.end_pos = merged_index_buf->getMemoryPtr() +
1053  merged_index_buf->size() - sizeof(StringOffsetT);
1054  merged_chunk_iter.second_buf = merged_data_buf->getMemoryPtr();
1055  } else {
1056  merged_chunk_iter.start_pos = merged_data_buf->getMemoryPtr();
1057  merged_chunk_iter.current_pos = merged_data_buf->getMemoryPtr();
1058  merged_chunk_iter.end_pos = merged_data_buf->getMemoryPtr() + merged_data_buf->size();
1059  merged_chunk_iter.second_buf = nullptr;
1060  }
1061  merged_chunk_iter.num_elems = total_num_tuples;
1062  merged_chunk_iter.skip = chunk_iter.skip;
1063  merged_chunk_iter.skip_size = chunk_iter.skip_size;
1064  merged_chunk_iter.type_info = chunk_iter.type_info;
1065  return merged_chunk_iter;
1066 }
1067 
1069  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
1070  const auto& cat = *executor_->getCatalog();
1071  auto& data_mgr = cat.getDataMgr();
1072 
1073  if (!linearized_data_buf_cache_.empty()) {
1074  for (auto& kv : linearized_data_buf_cache_) {
1075  for (auto& kv2 : kv.second) {
1076  data_mgr.free(kv2.second);
1077  }
1078  }
1079  }
1080 
1081  if (!linearized_idx_buf_cache_.empty()) {
1082  for (auto& kv : linearized_idx_buf_cache_) {
1083  for (auto& kv2 : kv.second) {
1084  data_mgr.free(kv2.second);
1085  }
1086  }
1087  }
1088 }
1089 
1091  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
1092  const auto& cat = *executor_->getCatalog();
1093  auto& data_mgr = cat.getDataMgr();
1096  data_mgr.free(kv.second);
1097  }
1098  }
1099 }
1100 
1102  const ResultSetPtr& buffer,
1103  const int table_id,
1104  const int col_id,
1105  const Data_Namespace::MemoryLevel memory_level,
1106  const int device_id,
1107  DeviceAllocator* device_allocator,
1108  const size_t thread_idx) const {
1109  const ColumnarResults* result{nullptr};
1110  {
1111  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
1112  if (columnarized_table_cache_.empty() || !columnarized_table_cache_.count(table_id)) {
1113  columnarized_table_cache_.insert(std::make_pair(
1114  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
1115  }
1116  auto& frag_id_to_result = columnarized_table_cache_[table_id];
1117  int frag_id = 0;
1118  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
1119  frag_id_to_result.insert(
1120  std::make_pair(frag_id,
1121  std::shared_ptr<const ColumnarResults>(
1122  columnarize_result(executor_->row_set_mem_owner_,
1123  buffer,
1124  executor_->executor_id_,
1125  thread_idx,
1126  frag_id))));
1127  }
1128  CHECK_NE(size_t(0), columnarized_table_cache_.count(table_id));
1129  result = columnarized_table_cache_[table_id][frag_id].get();
1130  }
1131  CHECK_GE(col_id, 0);
1132  return transferColumnIfNeeded(
1133  result, col_id, executor_->getDataMgr(), memory_level, device_id, device_allocator);
1134 }
int get_table_id() const
Definition: Analyzer.h:201
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< int > ChunkKey
Definition: types.h:36
int8_t * start_pos
Definition: ChunkIter.h:34
HOST DEVICE int get_size() const
Definition: sqltypes.h:389
std::unordered_map< int, int8_t * > DeviceMergedChunkIterMap
std::string cat(Ts &&...args)
int8_t * current_pos
Definition: ChunkIter.h:33
SQLTypeInfo type_info
Definition: ChunkIter.h:31
std::unordered_map< int, AbstractBuffer * > DeviceMergedChunkMap
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1388
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::mutex varlen_chunk_fetch_mutex_
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:243
void freeLinearizedBuf()
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
virtual int8_t * getMemoryPtr()=0
std::mutex columnar_fetch_mutex_
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:236
#define CHECK_GE(x, y)
Definition: Logger.h:235
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1198
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
virtual int8_t * alloc(const size_t num_bytes)=0
ColumnCacheMap columnarized_table_cache_
Constants for Builtin SQL Types supported by HEAVY.AI.
const int8_t * linearizeColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::shared_ptr< ResultSet > ResultSetPtr
const int8_t * getChunkiter(const InputColDescriptor col_desc, const int device_id=0) const
Executor * executor_
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:122
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
const int8_t * col_buff
std::mutex linearization_mutex_
#define CHECK_GT(x, y)
Definition: Logger.h:234
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const size_t executor_id, const int frag_id)
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
int32_t StringOffsetT
Definition: sqltypes.h:1224
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
__device__ bool check_interrupt()
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:228
void addMergedChunkIter(const InputColDescriptor col_desc, const int device_id, int8_t *chunk_iter_ptr) const
std::unordered_map< int, AbstractBuffer * > linearlized_temporary_cpu_index_buf_cache_
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
future< Result > async(Fn &&fn, Args &&...args)
int8_t * end_pos
Definition: ChunkIter.h:35
size_t num_elems
Definition: ChunkIter.h:38
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:220
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define INJECT_TIMER(DESC)
Definition: measure.h:93
#define CHECK_NE(x, y)
Definition: Logger.h:231
const size_t size() const
MergedChunk linearizeVarLenArrayColFrags(const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
MergedChunk linearizeFixedLenArrayColFrags(const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:86
int getColId() const
int getTableId() const
#define LIKELY(x)
Definition: likely.h:24
std::mutex linearized_col_cache_mutex_
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
ColumnFetcher(Executor *executor, const ColumnCacheMap &column_cache)
const ChunkMetadataMap & getChunkMetadataMap() const
std::mutex chunk_list_mutex_
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:82
void freeTemporaryCpuLinearizedIdxBuf()
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
InputSourceType getSourceType() const
int skip_size
Definition: ChunkIter.h:37
#define CHECK_LT(x, y)
Definition: Logger.h:232
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
int32_t ArrayOffsetT
Definition: sqltypes.h:1225
std::pair< AbstractBuffer *, AbstractBuffer * > MergedChunk
Definition: ColumnFetcher.h:47
int8_t * second_buf
Definition: ChunkIter.h:32
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
int skip
Definition: ChunkIter.h:36
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
ChunkIter prepareChunkIter(AbstractBuffer *merged_data_buf, AbstractBuffer *merged_index_buf, ChunkIter &chunk_iter, bool is_true_varlen_type, const size_t total_num_tuples) const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
int get_column_id() const
Definition: Analyzer.h:202
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_idx_buf_cache_
unencoded array encoder
size_t num_elems
const std::vector< int8_t * > & getColumnBuffers() const
int cpu_threads()
Definition: thread_count.h:25
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
const InputDescriptor & getScanDesc() const
size_t g_enable_parallel_linearization
Definition: Execute.cpp:144
#define VLOG(n)
Definition: Logger.h:316
Type timer_start()
Definition: measure.h:42
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:191
const SQLTypeInfo & getColumnType(const int col_id) const
AbstractBuffer * alloc(const MemoryLevel memoryLevel, const int deviceId, const size_t numBytes)
Definition: DataMgr.cpp:516