OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnFetcher.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <memory>
20 
23 #include "QueryEngine/Execute.h"
24 #include "Shared/Intervals.h"
25 #include "Shared/likely.h"
26 #include "Shared/sqltypes.h"
27 
28 namespace {
29 
31  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
32  const ResultSetPtr& result,
33  const size_t thread_idx,
34  const size_t executor_id,
35  const int frag_id) {
37  CHECK_EQ(0, frag_id);
38 
39  std::vector<SQLTypeInfo> col_types;
40  for (size_t i = 0; i < result->colCount(); ++i) {
41  const auto& src_ti = result->getColType(i);
42  CHECK_EQ(result->checkSlotUsesFlatBufferFormat(i), src_ti.usesFlatBuffer());
43  auto ti = get_logical_type_info(src_ti);
44  ti.setUsesFlatBuffer(src_ti.supportsFlatBuffer());
45  col_types.push_back(ti);
46  }
47  return new ColumnarResults(
48  row_set_mem_owner, *result, result->colCount(), col_types, executor_id, thread_idx);
49 }
50 
52  switch (memoryLevel) {
53  case DISK_LEVEL:
54  return "DISK_LEVEL";
55  case GPU_LEVEL:
56  return "GPU_LEVEL";
57  case CPU_LEVEL:
58  return "CPU_LEVEL";
59  default:
60  return "UNKNOWN";
61  }
62 }
63 } // namespace
64 
65 ColumnFetcher::ColumnFetcher(Executor* executor, const ColumnCacheMap& column_cache)
66  : executor_(executor), columnarized_table_cache_(column_cache) {}
67 
71 std::pair<const int8_t*, size_t> ColumnFetcher::getOneColumnFragment(
72  Executor* executor,
73  const Analyzer::ColumnVar& hash_col,
74  const Fragmenter_Namespace::FragmentInfo& fragment,
75  const Data_Namespace::MemoryLevel effective_mem_lvl,
76  const int device_id,
77  DeviceAllocator* device_allocator,
78  const size_t thread_idx,
79  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
80  ColumnCacheMap& column_cache) {
81  static std::mutex columnar_conversion_mutex;
82  auto timer = DEBUG_TIMER(__func__);
83  if (fragment.isEmptyPhysicalFragment()) {
84  return {nullptr, 0};
85  }
86  const auto& column_key = hash_col.getColumnKey();
87  const auto cd = get_column_descriptor_maybe(column_key);
88  CHECK(!cd || !(cd->isVirtualCol));
89  const int8_t* col_buff = nullptr;
90  if (cd) { // real table
91  /* chunk_meta_it is used here to retrieve chunk numBytes and
92  numElements. Apparently, their values are often zeros. If we
93  knew how to predict the zero values, calling
94  getChunkMetadataMap could be avoided to skip
95  synthesize_metadata calls. */
96  auto chunk_meta_it = fragment.getChunkMetadataMap().find(column_key.column_id);
97  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
98  ChunkKey chunk_key{column_key.db_id,
99  fragment.physicalTableId,
100  column_key.column_id,
101  fragment.fragmentId};
102  const auto chunk = Chunk_NS::Chunk::getChunk(
103  cd,
104  executor->getDataMgr(),
105  chunk_key,
106  effective_mem_lvl,
107  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
108  chunk_meta_it->second->numBytes,
109  chunk_meta_it->second->numElements);
110  chunks_owner.push_back(chunk);
111  CHECK(chunk);
112  auto ab = chunk->getBuffer();
113  CHECK(ab->getMemoryPtr());
114  col_buff = reinterpret_cast<int8_t*>(ab->getMemoryPtr());
115  } else { // temporary table
116  const ColumnarResults* col_frag{nullptr};
117  {
118  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
119  const auto frag_id = fragment.fragmentId;
120  shared::TableKey table_key{column_key.db_id, column_key.table_id};
121  if (column_cache.empty() || !column_cache.count(table_key)) {
122  column_cache.insert(std::make_pair(
123  table_key,
124  std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
125  }
126  auto& frag_id_to_result = column_cache[table_key];
127  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
128  frag_id_to_result.insert(std::make_pair(
129  frag_id,
130  std::shared_ptr<const ColumnarResults>(columnarize_result(
131  executor->row_set_mem_owner_,
132  get_temporary_table(executor->temporary_tables_, table_key.table_id),
133  executor->executor_id_,
134  thread_idx,
135  frag_id))));
136  }
137  col_frag = column_cache[table_key][frag_id].get();
138  }
139  col_buff = transferColumnIfNeeded(
140  col_frag,
141  hash_col.getColumnKey().column_id,
142  executor->getDataMgr(),
143  effective_mem_lvl,
144  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
145  device_allocator);
146  }
147  return {col_buff, fragment.getNumTuples()};
148 }
149 
160  Executor* executor,
161  const Analyzer::ColumnVar& hash_col,
162  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
163  const Data_Namespace::MemoryLevel effective_mem_lvl,
164  const int device_id,
165  DeviceAllocator* device_allocator,
166  const size_t thread_idx,
167  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
168  std::vector<std::shared_ptr<void>>& malloc_owner,
169  ColumnCacheMap& column_cache) {
170  CHECK(!fragments.empty());
171 
172  size_t col_chunks_buff_sz = sizeof(struct JoinChunk) * fragments.size();
173  // TODO: needs an allocator owner
174  auto col_chunks_buff = reinterpret_cast<int8_t*>(
175  malloc_owner.emplace_back(checked_malloc(col_chunks_buff_sz), free).get());
176  auto join_chunk_array = reinterpret_cast<struct JoinChunk*>(col_chunks_buff);
177 
178  size_t num_elems = 0;
179  size_t num_chunks = 0;
180  for (auto& frag : fragments) {
182  executor->checkNonKernelTimeInterrupted()) {
184  }
185  auto [col_buff, elem_count] = getOneColumnFragment(
186  executor,
187  hash_col,
188  frag,
189  effective_mem_lvl,
190  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
191  device_allocator,
192  thread_idx,
193  chunks_owner,
194  column_cache);
195  if (col_buff != nullptr) {
196  num_elems += elem_count;
197  join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
198  } else {
199  continue;
200  }
201  ++num_chunks;
202  }
203 
204  int elem_sz = hash_col.get_type_info().get_size();
205  CHECK_GT(elem_sz, 0);
206 
207  return {col_chunks_buff,
208  col_chunks_buff_sz,
209  num_chunks,
210  num_elems,
211  static_cast<size_t>(elem_sz)};
212 }
213 
215  const shared::TableKey& table_key,
216  const int frag_id,
217  const int col_id,
218  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
219  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
220  std::list<ChunkIter>& chunk_iter_holder,
221  const Data_Namespace::MemoryLevel memory_level,
222  const int device_id,
223  DeviceAllocator* allocator) const {
224  const auto fragments_it = all_tables_fragments.find(table_key);
225  CHECK(fragments_it != all_tables_fragments.end());
226  const auto fragments = fragments_it->second;
227  const auto& fragment = (*fragments)[frag_id];
228  if (fragment.isEmptyPhysicalFragment()) {
229  return nullptr;
230  }
231  std::shared_ptr<Chunk_NS::Chunk> chunk;
232  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
233  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
234  CHECK(table_key.table_id > 0);
235  const auto cd = get_column_descriptor({table_key, col_id});
236  CHECK(cd);
237  const auto col_type =
238  get_column_type(col_id, table_key.table_id, cd, executor_->temporary_tables_);
239  const bool is_real_string =
240  col_type.is_string() && col_type.get_compression() == kENCODING_NONE;
241  const bool is_varlen =
242  is_real_string ||
243  col_type.is_array(); // TODO: should it be col_type.is_varlen_array() ?
244  {
245  ChunkKey chunk_key{
246  table_key.db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
247  std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
248  if (is_varlen) {
249  varlen_chunk_lock.reset(new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
250  }
252  cd,
253  executor_->getDataMgr(),
254  chunk_key,
255  memory_level,
256  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id,
257  chunk_meta_it->second->numBytes,
258  chunk_meta_it->second->numElements);
259  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
260  chunk_holder.push_back(chunk);
261  }
262  if (is_varlen) {
263  CHECK_GT(table_key.table_id, 0);
264  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
265  chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
266  auto& chunk_iter = chunk_iter_holder.back();
267  if (memory_level == Data_Namespace::CPU_LEVEL) {
268  return reinterpret_cast<int8_t*>(&chunk_iter);
269  } else {
270  auto ab = chunk->getBuffer();
271  ab->pin();
272  auto& row_set_mem_owner = executor_->getRowSetMemoryOwner();
273  row_set_mem_owner->addVarlenInputBuffer(ab);
274  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
275  CHECK(allocator);
276  auto chunk_iter_gpu = allocator->alloc(sizeof(ChunkIter));
277  allocator->copyToDevice(
278  chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter), sizeof(ChunkIter));
279  return chunk_iter_gpu;
280  }
281  } else {
282  auto ab = chunk->getBuffer();
283  CHECK(ab->getMemoryPtr());
284  return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
285  }
286 }
287 
289  const shared::TableKey& table_key,
290  const int col_id,
291  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
292  const Data_Namespace::MemoryLevel memory_level,
293  const int device_id,
294  DeviceAllocator* device_allocator,
295  const size_t thread_idx) const {
296  const auto fragments_it = all_tables_fragments.find(table_key);
297  CHECK(fragments_it != all_tables_fragments.end());
298  const auto fragments = fragments_it->second;
299  const auto frag_count = fragments->size();
300  std::vector<std::unique_ptr<ColumnarResults>> column_frags;
301  const ColumnarResults* table_column = nullptr;
302  const InputColDescriptor col_desc(col_id, table_key.table_id, table_key.db_id, int(0));
304  {
305  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
306  auto column_it = columnarized_scan_table_cache_.find(col_desc);
307  if (column_it == columnarized_scan_table_cache_.end()) {
308  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
310  executor_->checkNonKernelTimeInterrupted()) {
312  }
313  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
314  std::list<ChunkIter> chunk_iter_holder;
315  const auto& fragment = (*fragments)[frag_id];
316  if (fragment.isEmptyPhysicalFragment()) {
317  continue;
318  }
319  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
320  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
321  auto col_buffer = getOneTableColumnFragment(table_key,
322  static_cast<int>(frag_id),
323  col_id,
324  all_tables_fragments,
325  chunk_holder,
326  chunk_iter_holder,
328  int(0),
329  device_allocator);
330  column_frags.push_back(
331  std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
332  col_buffer,
333  fragment.getNumTuples(),
334  chunk_meta_it->second->sqlType,
335  executor_->executor_id_,
336  thread_idx));
337  }
338  auto merged_results =
339  ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
340  table_column = merged_results.get();
341  columnarized_scan_table_cache_.emplace(col_desc, std::move(merged_results));
342  } else {
343  table_column = column_it->second.get();
344  }
345  }
346  return ColumnFetcher::transferColumnIfNeeded(table_column,
347  0,
348  executor_->getDataMgr(),
349  memory_level,
350  device_id,
351  device_allocator);
352 }
353 
355  const InputColDescriptor* col_desc,
356  const Data_Namespace::MemoryLevel memory_level,
357  const int device_id,
358  DeviceAllocator* device_allocator,
359  const size_t thread_idx) const {
360  CHECK(col_desc);
361  const auto table_key = col_desc->getScanDesc().getTableKey();
362  return getResultSetColumn(
363  get_temporary_table(executor_->temporary_tables_, table_key.table_id),
364  table_key,
365  col_desc->getColId(),
366  memory_level,
367  device_id,
368  device_allocator,
369  thread_idx);
370 }
371 
373  const shared::TableKey& table_key,
374  const int col_id,
375  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
376  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
377  std::list<ChunkIter>& chunk_iter_holder,
378  const Data_Namespace::MemoryLevel memory_level,
379  const int device_id,
380  DeviceAllocator* device_allocator,
381  const size_t thread_idx) const {
382  auto timer = DEBUG_TIMER(__func__);
383  const auto fragments_it = all_tables_fragments.find(table_key);
384  CHECK(fragments_it != all_tables_fragments.end());
385  const auto fragments = fragments_it->second;
386  const auto frag_count = fragments->size();
387  const InputColDescriptor col_desc(col_id, table_key.table_id, table_key.db_id, int(0));
388  const auto cd = get_column_descriptor({table_key, col_id});
389  CHECK(cd);
391  CHECK_GT(table_key.table_id, 0);
392  bool is_varlen_chunk = cd->columnType.is_varlen() && !cd->columnType.is_fixlen_array();
393  size_t total_num_tuples = 0;
394  size_t total_data_buf_size = 0;
395  size_t total_idx_buf_size = 0;
396  {
397  std::lock_guard<std::mutex> linearize_guard(linearized_col_cache_mutex_);
398  auto linearized_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
399  if (linearized_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
400  if (memory_level == CPU_LEVEL) {
401  // in CPU execution, each kernel can share merged chunk since they operates in the
402  // same memory space, so we can share the same chunk iter among kernels
403  return getChunkiter(col_desc, 0);
404  } else {
405  // in GPU execution, this becomes the matter when we deploy multi-GPUs
406  // so we only share the chunk_iter iff kernels are launched on the same GPU device
407  // otherwise we need to separately load merged chunk and its iter
408  // todo(yoonmin): D2D copy of merged chunk and its iter?
409  if (linearized_iter_it->second.find(device_id) !=
410  linearized_iter_it->second.end()) {
411  // note that cached chunk_iter is located on CPU memory space...
412  // we just need to copy it to each device
413  // chunk_iter already contains correct buffer addr depending on execution device
414  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
415  device_allocator->copyToDevice(
416  chunk_iter_gpu, getChunkiter(col_desc, device_id), sizeof(ChunkIter));
417  return chunk_iter_gpu;
418  }
419  }
420  }
421  }
422 
423  // collect target fragments
424  // basically we load chunk in CPU first, and do necessary manipulation
425  // to make semantics of a merged chunk correctly
426  std::shared_ptr<Chunk_NS::Chunk> chunk;
427  std::list<std::shared_ptr<Chunk_NS::Chunk>> local_chunk_holder;
428  std::list<ChunkIter> local_chunk_iter_holder;
429  std::list<size_t> local_chunk_num_tuples;
430  {
431  std::lock_guard<std::mutex> linearize_guard(varlen_chunk_fetch_mutex_);
432  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
433  const auto& fragment = (*fragments)[frag_id];
434  if (fragment.isEmptyPhysicalFragment()) {
435  continue;
436  }
437  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
438  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
439  ChunkKey chunk_key{
440  table_key.db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
441  chunk = Chunk_NS::Chunk::getChunk(cd,
442  executor_->getDataMgr(),
443  chunk_key,
445  0,
446  chunk_meta_it->second->numBytes,
447  chunk_meta_it->second->numElements);
448  local_chunk_holder.push_back(chunk);
449  auto chunk_iter = chunk->begin_iterator(chunk_meta_it->second);
450  local_chunk_iter_holder.push_back(chunk_iter);
451  local_chunk_num_tuples.push_back(fragment.getNumTuples());
452  total_num_tuples += fragment.getNumTuples();
453  total_data_buf_size += chunk->getBuffer()->size();
454  std::ostringstream oss;
455  oss << "Load chunk for col_name: " << chunk->getColumnDesc()->columnName
456  << ", col_id: " << chunk->getColumnDesc()->columnId << ", Frag-" << frag_id
457  << ", numTuples: " << fragment.getNumTuples()
458  << ", data_size: " << chunk->getBuffer()->size();
459  if (chunk->getIndexBuf()) {
460  auto idx_buf_size = chunk->getIndexBuf()->size() - sizeof(ArrayOffsetT);
461  oss << ", index_size: " << idx_buf_size;
462  total_idx_buf_size += idx_buf_size;
463  }
464  VLOG(2) << oss.str();
465  }
466  }
467 
468  auto& col_ti = cd->columnType;
469  MergedChunk res{nullptr, nullptr};
470  // Do linearize multi-fragmented column depending on column type
471  // We cover array and non-encoded text columns
472  // Note that geo column is actually organized as a set of arrays
473  // and each geo object has different set of vectors that they require
474  // Here, we linearize each array at a time, so eventually the geo object has a set of
475  // "linearized" arrays
476  {
477  std::lock_guard<std::mutex> linearization_guard(linearization_mutex_);
478  if (col_ti.is_array()) {
479  if (col_ti.is_fixlen_array()) {
480  VLOG(2) << "Linearize fixed-length multi-frag array column (col_id: "
481  << cd->columnId << ", col_name: " << cd->columnName
482  << ", device_type: " << getMemoryLevelString(memory_level)
483  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
485  chunk_holder,
486  chunk_iter_holder,
487  local_chunk_holder,
488  local_chunk_iter_holder,
489  local_chunk_num_tuples,
490  memory_level,
491  cd,
492  device_id,
493  total_data_buf_size,
494  total_idx_buf_size,
495  total_num_tuples,
496  device_allocator,
497  thread_idx);
498  } else {
499  CHECK(col_ti.is_varlen_array());
500  VLOG(2) << "Linearize variable-length multi-frag array column (col_id: "
501  << cd->columnId << ", col_name: " << cd->columnName
502  << ", device_type: " << getMemoryLevelString(memory_level)
503  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
505  chunk_holder,
506  chunk_iter_holder,
507  local_chunk_holder,
508  local_chunk_iter_holder,
509  local_chunk_num_tuples,
510  memory_level,
511  cd,
512  device_id,
513  total_data_buf_size,
514  total_idx_buf_size,
515  total_num_tuples,
516  device_allocator,
517  thread_idx);
518  }
519  }
520  if (col_ti.is_string() && !col_ti.is_dict_encoded_string()) {
521  VLOG(2) << "Linearize variable-length multi-frag non-encoded text column (col_id: "
522  << cd->columnId << ", col_name: " << cd->columnName
523  << ", device_type: " << getMemoryLevelString(memory_level)
524  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
526  chunk_holder,
527  chunk_iter_holder,
528  local_chunk_holder,
529  local_chunk_iter_holder,
530  local_chunk_num_tuples,
531  memory_level,
532  cd,
533  device_id,
534  total_data_buf_size,
535  total_idx_buf_size,
536  total_num_tuples,
537  device_allocator,
538  thread_idx);
539  }
540  }
541  CHECK(res.first); // check merged data buffer
542  if (!col_ti.is_fixlen_array()) {
543  CHECK(res.second); // check merged index buffer
544  }
545  auto merged_data_buffer = res.first;
546  auto merged_index_buffer = res.second;
547 
548  // prepare ChunkIter for the linearized chunk
549  auto merged_chunk = std::make_shared<Chunk_NS::Chunk>(
550  merged_data_buffer, merged_index_buffer, cd, false);
551  // to prepare chunk_iter for the merged chunk, we pass one of local chunk iter
552  // to fill necessary metadata that is a common for all merged chunks
553  auto merged_chunk_iter = prepareChunkIter(merged_data_buffer,
554  merged_index_buffer,
555  *(local_chunk_iter_holder.rbegin()),
556  is_varlen_chunk,
557  total_num_tuples);
558  {
559  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
560  chunk_holder.push_back(merged_chunk);
561  chunk_iter_holder.push_back(merged_chunk_iter);
562  }
563 
564  auto merged_chunk_iter_ptr = reinterpret_cast<int8_t*>(&(chunk_iter_holder.back()));
565  if (memory_level == MemoryLevel::CPU_LEVEL) {
566  addMergedChunkIter(col_desc, 0, merged_chunk_iter_ptr);
567  return merged_chunk_iter_ptr;
568  } else {
569  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
570  CHECK(device_allocator);
571  addMergedChunkIter(col_desc, device_id, merged_chunk_iter_ptr);
572  // note that merged_chunk_iter_ptr resides in CPU memory space
573  // having its content aware GPU buffer that we alloc. for merging
574  // so we need to copy this chunk_iter to each device explicitly
575  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
576  device_allocator->copyToDevice(
577  chunk_iter_gpu, merged_chunk_iter_ptr, sizeof(ChunkIter));
578  return chunk_iter_gpu;
579  }
580 }
581 
583  int32_t db_id,
584  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
585  std::list<ChunkIter>& chunk_iter_holder,
586  std::list<std::shared_ptr<Chunk_NS::Chunk>>& local_chunk_holder,
587  std::list<ChunkIter>& local_chunk_iter_holder,
588  std::list<size_t>& local_chunk_num_tuples,
589  MemoryLevel memory_level,
590  const ColumnDescriptor* cd,
591  const int device_id,
592  const size_t total_data_buf_size,
593  const size_t total_idx_buf_size,
594  const size_t total_num_tuples,
595  DeviceAllocator* device_allocator,
596  const size_t thread_idx) const {
597  // for linearization of varlen col we have to deal with not only data buffer
598  // but also its underlying index buffer which is responsible for offset of varlen value
599  // basically we maintain per-device linearized (data/index) buffer
600  // for data buffer, we linearize varlen col's chunks within a device-specific buffer
601  // by just appending each chunk
602  // for index buffer, we need to not only appending each chunk but modify the offset
603  // value to affect various conditions like nullness, padding and so on so we first
604  // append index buffer in CPU, manipulate it as we required and then copy it to specific
605  // device if necessary (for GPU execution)
606  AbstractBuffer* merged_index_buffer_in_cpu = nullptr;
607  AbstractBuffer* merged_data_buffer = nullptr;
608  bool has_cached_merged_idx_buf = false;
609  bool has_cached_merged_data_buf = false;
610  const InputColDescriptor icd(cd->columnId, cd->tableId, db_id, int(0));
611  // check linearized buffer's cache first
612  // if not exists, alloc necessary buffer space to prepare linearization
613  int64_t linearization_time_ms = 0;
614  auto clock_begin = timer_start();
615  {
616  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
617  auto cached_data_buf_cache_it = linearized_data_buf_cache_.find(icd);
618  if (cached_data_buf_cache_it != linearized_data_buf_cache_.end()) {
619  auto& cd_cache = cached_data_buf_cache_it->second;
620  auto cached_data_buf_it = cd_cache.find(device_id);
621  if (cached_data_buf_it != cd_cache.end()) {
622  has_cached_merged_data_buf = true;
623  merged_data_buffer = cached_data_buf_it->second;
624  VLOG(2) << "Recycle merged data buffer for linearized chunks (memory_level: "
625  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
626  << ")";
627  } else {
628  merged_data_buffer =
629  executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
630  VLOG(2) << "Allocate " << total_data_buf_size
631  << " bytes of data buffer space for linearized chunks (memory_level: "
632  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
633  << ")";
634  cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
635  }
636  } else {
638  merged_data_buffer =
639  executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
640  VLOG(2) << "Allocate " << total_data_buf_size
641  << " bytes of data buffer space for linearized chunks (memory_level: "
642  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
643  << ")";
644  m.insert(std::make_pair(device_id, merged_data_buffer));
645  linearized_data_buf_cache_.insert(std::make_pair(icd, m));
646  }
647 
648  auto cached_index_buf_it =
650  if (cached_index_buf_it != linearlized_temporary_cpu_index_buf_cache_.end()) {
651  has_cached_merged_idx_buf = true;
652  merged_index_buffer_in_cpu = cached_index_buf_it->second;
653  VLOG(2)
654  << "Recycle merged temporary idx buffer for linearized chunks (memory_level: "
655  << getMemoryLevelString(memory_level) << ", device_id: " << device_id << ")";
656  } else {
657  auto idx_buf_size = total_idx_buf_size + sizeof(ArrayOffsetT);
658  merged_index_buffer_in_cpu =
659  executor_->getDataMgr()->alloc(Data_Namespace::CPU_LEVEL, 0, idx_buf_size);
660  VLOG(2) << "Allocate " << idx_buf_size
661  << " bytes of temporary idx buffer space on CPU for linearized chunks";
662  // just copy the buf addr since we access it via the pointer itself
664  std::make_pair(cd->columnId, merged_index_buffer_in_cpu));
665  }
666  }
667 
668  // linearize buffers if we don't have corresponding buf in cache
669  size_t sum_data_buf_size = 0;
670  size_t cur_sum_num_tuples = 0;
671  size_t total_idx_size_modifier = 0;
672  auto chunk_holder_it = local_chunk_holder.begin();
673  auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
674  auto chunk_num_tuple_it = local_chunk_num_tuples.begin();
675  bool null_padded_first_elem = false;
676  bool null_padded_last_val = false;
677  // before entering the actual linearization part, we first need to check
678  // the overflow case where the sum of index offset becomes larger than 2GB
679  // which currently incurs incorrect query result due to negative array offset
680  // note that we can separate this from the main linearization logic b/c
681  // we just need to see few last elems
682  // todo (yoonmin) : relax this to support larger chunk size (>2GB)
683  for (; chunk_holder_it != local_chunk_holder.end();
684  chunk_holder_it++, chunk_num_tuple_it++) {
685  // check the offset overflow based on the last "valid" offset for each chunk
686  auto target_chunk = chunk_holder_it->get();
687  auto target_chunk_data_buffer = target_chunk->getBuffer();
688  auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
689  auto target_idx_buf_ptr =
690  reinterpret_cast<ArrayOffsetT*>(target_chunk_idx_buffer->getMemoryPtr());
691  auto cur_chunk_num_tuples = *chunk_num_tuple_it;
692  ArrayOffsetT original_offset = -1;
693  size_t cur_idx = cur_chunk_num_tuples;
694  // find the valid (e.g., non-null) offset starting from the last elem
695  while (original_offset < 0) {
696  original_offset = target_idx_buf_ptr[--cur_idx];
697  }
698  ArrayOffsetT new_offset = original_offset + sum_data_buf_size;
699  if (new_offset < 0) {
700  throw std::runtime_error(
701  "Linearization of a variable-length column having chunk size larger than 2GB "
702  "not supported yet");
703  }
704  sum_data_buf_size += target_chunk_data_buffer->size();
705  }
706  chunk_holder_it = local_chunk_holder.begin();
707  chunk_num_tuple_it = local_chunk_num_tuples.begin();
708  sum_data_buf_size = 0;
709 
710  for (; chunk_holder_it != local_chunk_holder.end();
711  chunk_holder_it++, chunk_iter_holder_it++, chunk_num_tuple_it++) {
713  executor_->checkNonKernelTimeInterrupted()) {
715  }
716  auto target_chunk = chunk_holder_it->get();
717  auto target_chunk_data_buffer = target_chunk->getBuffer();
718  auto cur_chunk_num_tuples = *chunk_num_tuple_it;
719  auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
720  auto target_idx_buf_ptr =
721  reinterpret_cast<ArrayOffsetT*>(target_chunk_idx_buffer->getMemoryPtr());
722  auto idx_buf_size = target_chunk_idx_buffer->size() - sizeof(ArrayOffsetT);
723  auto target_data_buffer_start_ptr = target_chunk_data_buffer->getMemoryPtr();
724  auto target_data_buffer_size = target_chunk_data_buffer->size();
725 
726  // when linearizing idx buffers, we need to consider the following cases
727  // 1. the first idx val is padded (a. null / b. empty varlen arr / c. 1-byte size
728  // varlen arr, i.e., {1})
729  // 2. the last idx val is null
730  // 3. null value(s) is/are located in a middle of idx buf <-- we don't need to care
731  if (cur_sum_num_tuples > 0 && target_idx_buf_ptr[0] > 0) {
732  null_padded_first_elem = true;
733  target_data_buffer_start_ptr += ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
734  target_data_buffer_size -= ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
735  total_idx_size_modifier += ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
736  }
737  // we linearize data_buf in device-specific buffer
738  if (!has_cached_merged_data_buf) {
739  merged_data_buffer->append(target_data_buffer_start_ptr,
740  target_data_buffer_size,
742  device_id);
743  }
744 
745  if (!has_cached_merged_idx_buf) {
746  // linearize idx buf in CPU first
747  merged_index_buffer_in_cpu->append(target_chunk_idx_buffer->getMemoryPtr(),
748  idx_buf_size,
750  0); // merged_index_buffer_in_cpu resides in CPU
751  auto idx_buf_ptr =
752  reinterpret_cast<ArrayOffsetT*>(merged_index_buffer_in_cpu->getMemoryPtr());
753  // here, we do not need to manipulate the very first idx buf, just let it as is
754  // and modify otherwise (i.e., starting from second chunk idx buf)
755  if (cur_sum_num_tuples > 0) {
756  if (null_padded_last_val) {
757  // case 2. the previous chunk's last index val is null so we need to set this
758  // chunk's first val to be null
759  idx_buf_ptr[cur_sum_num_tuples] = -sum_data_buf_size;
760  }
761  const size_t worker_count = cpu_threads();
762  std::vector<std::future<void>> conversion_threads;
763  std::vector<std::vector<size_t>> null_padded_row_idx_vecs(worker_count,
764  std::vector<size_t>());
765  bool is_parallel_modification = false;
766  std::vector<size_t> null_padded_row_idx_vec;
767  const auto do_work = [&cur_sum_num_tuples,
768  &sum_data_buf_size,
769  &null_padded_first_elem,
770  &idx_buf_ptr](
771  const size_t start,
772  const size_t end,
773  const bool is_parallel_modification,
774  std::vector<size_t>* null_padded_row_idx_vec) {
775  for (size_t i = start; i < end; i++) {
776  if (LIKELY(idx_buf_ptr[cur_sum_num_tuples + i] >= 0)) {
777  if (null_padded_first_elem) {
778  // deal with null padded bytes
779  idx_buf_ptr[cur_sum_num_tuples + i] -=
781  }
782  idx_buf_ptr[cur_sum_num_tuples + i] += sum_data_buf_size;
783  } else {
784  // null padded row needs to reference the previous row idx so in
785  // multi-threaded index modification we may suffer from thread
786  // contention when thread-i needs to reference thread-j's row idx so we
787  // collect row idxs for null rows here and deal with them after this
788  // step
789  null_padded_row_idx_vec->push_back(cur_sum_num_tuples + i);
790  }
791  }
792  };
793  if (cur_chunk_num_tuples > g_enable_parallel_linearization) {
794  is_parallel_modification = true;
795  for (auto interval :
796  makeIntervals(size_t(0), cur_chunk_num_tuples, worker_count)) {
797  conversion_threads.push_back(
799  do_work,
800  interval.begin,
801  interval.end,
802  is_parallel_modification,
803  &null_padded_row_idx_vecs[interval.index]));
804  }
805  for (auto& child : conversion_threads) {
806  child.wait();
807  }
808  for (auto& v : null_padded_row_idx_vecs) {
809  std::copy(v.begin(), v.end(), std::back_inserter(null_padded_row_idx_vec));
810  }
811  } else {
812  do_work(size_t(0),
813  cur_chunk_num_tuples,
814  is_parallel_modification,
815  &null_padded_row_idx_vec);
816  }
817  if (!null_padded_row_idx_vec.empty()) {
818  // modify null padded row idxs by referencing the previous row
819  // here we sort row idxs to correctly propagate modified row idxs
820  std::sort(null_padded_row_idx_vec.begin(), null_padded_row_idx_vec.end());
821  for (auto& padded_null_row_idx : null_padded_row_idx_vec) {
822  if (idx_buf_ptr[padded_null_row_idx - 1] > 0) {
823  idx_buf_ptr[padded_null_row_idx] = -idx_buf_ptr[padded_null_row_idx - 1];
824  } else {
825  idx_buf_ptr[padded_null_row_idx] = idx_buf_ptr[padded_null_row_idx - 1];
826  }
827  }
828  }
829  }
830  }
831  cur_sum_num_tuples += cur_chunk_num_tuples;
832  sum_data_buf_size += target_chunk_data_buffer->size();
833  if (target_idx_buf_ptr[*chunk_num_tuple_it] < 0) {
834  null_padded_last_val = true;
835  } else {
836  null_padded_last_val = false;
837  }
838  if (null_padded_first_elem) {
839  sum_data_buf_size -= ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
840  null_padded_first_elem = false; // set for the next chunk
841  }
842  if (!has_cached_merged_idx_buf && cur_sum_num_tuples == total_num_tuples) {
843  auto merged_index_buffer_ptr =
844  reinterpret_cast<ArrayOffsetT*>(merged_index_buffer_in_cpu->getMemoryPtr());
845  merged_index_buffer_ptr[total_num_tuples] =
846  total_data_buf_size -
847  total_idx_size_modifier; // last index value is total data size;
848  }
849  }
850 
851  // put linearized index buffer to per-device cache
852  AbstractBuffer* merged_index_buffer = nullptr;
853  size_t buf_size = total_idx_buf_size + sizeof(ArrayOffsetT);
854  auto copyBuf =
855  [&device_allocator](
856  int8_t* src, int8_t* dest, size_t buf_size, MemoryLevel memory_level) {
857  if (memory_level == Data_Namespace::CPU_LEVEL) {
858  memcpy((void*)dest, src, buf_size);
859  } else {
860  CHECK(memory_level == Data_Namespace::GPU_LEVEL);
861  device_allocator->copyToDevice(dest, src, buf_size);
862  }
863  };
864  {
865  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
866  auto merged_idx_buf_cache_it = linearized_idx_buf_cache_.find(icd);
867  // for CPU execution, we can use `merged_index_buffer_in_cpu` as is
868  // but for GPU, we have to copy it to corresponding device
869  if (memory_level == MemoryLevel::GPU_LEVEL) {
870  if (merged_idx_buf_cache_it != linearized_idx_buf_cache_.end()) {
871  auto& merged_idx_buf_cache = merged_idx_buf_cache_it->second;
872  auto merged_idx_buf_it = merged_idx_buf_cache.find(device_id);
873  if (merged_idx_buf_it != merged_idx_buf_cache.end()) {
874  merged_index_buffer = merged_idx_buf_it->second;
875  } else {
876  merged_index_buffer =
877  executor_->getDataMgr()->alloc(memory_level, device_id, buf_size);
878  copyBuf(merged_index_buffer_in_cpu->getMemoryPtr(),
879  merged_index_buffer->getMemoryPtr(),
880  buf_size,
881  memory_level);
882  merged_idx_buf_cache.insert(std::make_pair(device_id, merged_index_buffer));
883  }
884  } else {
885  merged_index_buffer =
886  executor_->getDataMgr()->alloc(memory_level, device_id, buf_size);
887  copyBuf(merged_index_buffer_in_cpu->getMemoryPtr(),
888  merged_index_buffer->getMemoryPtr(),
889  buf_size,
890  memory_level);
892  m.insert(std::make_pair(device_id, merged_index_buffer));
893  linearized_idx_buf_cache_.insert(std::make_pair(icd, m));
894  }
895  } else {
896  // `linearlized_temporary_cpu_index_buf_cache_` has this buf
897  merged_index_buffer = merged_index_buffer_in_cpu;
898  }
899  }
900  CHECK(merged_index_buffer);
901  linearization_time_ms += timer_stop(clock_begin);
902  VLOG(2) << "Linearization has been successfully done, elapsed time: "
903  << linearization_time_ms << " ms.";
904  return {merged_data_buffer, merged_index_buffer};
905 }
906 
908  int32_t db_id,
909  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
910  std::list<ChunkIter>& chunk_iter_holder,
911  std::list<std::shared_ptr<Chunk_NS::Chunk>>& local_chunk_holder,
912  std::list<ChunkIter>& local_chunk_iter_holder,
913  std::list<size_t>& local_chunk_num_tuples,
914  MemoryLevel memory_level,
915  const ColumnDescriptor* cd,
916  const int device_id,
917  const size_t total_data_buf_size,
918  const size_t total_idx_buf_size,
919  const size_t total_num_tuples,
920  DeviceAllocator* device_allocator,
921  const size_t thread_idx) const {
922  int64_t linearization_time_ms = 0;
923  auto clock_begin = timer_start();
924  // linearize collected fragments
925  AbstractBuffer* merged_data_buffer = nullptr;
926  bool has_cached_merged_data_buf = false;
927  const InputColDescriptor icd(cd->columnId, cd->tableId, db_id, int(0));
928  {
929  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
930  auto cached_data_buf_cache_it = linearized_data_buf_cache_.find(icd);
931  if (cached_data_buf_cache_it != linearized_data_buf_cache_.end()) {
932  auto& cd_cache = cached_data_buf_cache_it->second;
933  auto cached_data_buf_it = cd_cache.find(device_id);
934  if (cached_data_buf_it != cd_cache.end()) {
935  has_cached_merged_data_buf = true;
936  merged_data_buffer = cached_data_buf_it->second;
937  VLOG(2) << "Recycle merged data buffer for linearized chunks (memory_level: "
938  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
939  << ")";
940  } else {
941  merged_data_buffer =
942  executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
943  VLOG(2) << "Allocate " << total_data_buf_size
944  << " bytes of data buffer space for linearized chunks (memory_level: "
945  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
946  << ")";
947  cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
948  }
949  } else {
951  merged_data_buffer =
952  executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
953  VLOG(2) << "Allocate " << total_data_buf_size
954  << " bytes of data buffer space for linearized chunks (memory_level: "
955  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
956  << ")";
957  m.insert(std::make_pair(device_id, merged_data_buffer));
958  linearized_data_buf_cache_.insert(std::make_pair(icd, m));
959  }
960  }
961  if (!has_cached_merged_data_buf) {
962  size_t sum_data_buf_size = 0;
963  auto chunk_holder_it = local_chunk_holder.begin();
964  auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
965  for (; chunk_holder_it != local_chunk_holder.end();
966  chunk_holder_it++, chunk_iter_holder_it++) {
969  }
970  auto target_chunk = chunk_holder_it->get();
971  auto target_chunk_data_buffer = target_chunk->getBuffer();
972  merged_data_buffer->append(target_chunk_data_buffer->getMemoryPtr(),
973  target_chunk_data_buffer->size(),
975  device_id);
976  sum_data_buf_size += target_chunk_data_buffer->size();
977  }
978  // check whether each chunk's data buffer is clean under chunk merging
979  CHECK_EQ(total_data_buf_size, sum_data_buf_size);
980  }
981  linearization_time_ms += timer_stop(clock_begin);
982  VLOG(2) << "Linearization has been successfully done, elapsed time: "
983  << linearization_time_ms << " ms.";
984  return {merged_data_buffer, nullptr};
985 }
986 
988  const ColumnarResults* columnar_results,
989  const int col_id,
990  Data_Namespace::DataMgr* data_mgr,
991  const Data_Namespace::MemoryLevel memory_level,
992  const int device_id,
993  DeviceAllocator* device_allocator) {
994  if (!columnar_results) {
995  return nullptr;
996  }
997  const auto& col_buffers = columnar_results->getColumnBuffers();
998  CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
999  if (memory_level == Data_Namespace::GPU_LEVEL) {
1000  const auto& col_ti = columnar_results->getColumnType(col_id);
1001  size_t num_bytes;
1002  if (col_ti.usesFlatBuffer()) {
1003  CHECK(FlatBufferManager::isFlatBuffer(col_buffers[col_id]));
1004  num_bytes = FlatBufferManager::getBufferSize(col_buffers[col_id]);
1005  } else {
1006  num_bytes = columnar_results->size() * col_ti.get_size();
1007  }
1008  CHECK(device_allocator);
1009  auto gpu_col_buffer = device_allocator->alloc(num_bytes);
1010  device_allocator->copyToDevice(gpu_col_buffer, col_buffers[col_id], num_bytes);
1011  return gpu_col_buffer;
1012  }
1013  return col_buffers[col_id];
1014 }
1015 
1017  const int device_id,
1018  int8_t* chunk_iter_ptr) const {
1019  std::lock_guard<std::mutex> linearize_guard(linearized_col_cache_mutex_);
1020  auto chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
1021  if (chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
1022  auto iter_device_it = chunk_iter_it->second.find(device_id);
1023  if (iter_device_it == chunk_iter_it->second.end()) {
1024  VLOG(2) << "Additional merged chunk_iter for col_desc (tbl: "
1025  << col_desc.getScanDesc().getTableKey() << ", col: " << col_desc.getColId()
1026  << "), device_id: " << device_id;
1027  chunk_iter_it->second.emplace(device_id, chunk_iter_ptr);
1028  }
1029  } else {
1030  DeviceMergedChunkIterMap iter_m;
1031  iter_m.emplace(device_id, chunk_iter_ptr);
1032  VLOG(2) << "New merged chunk_iter for col_desc (tbl: "
1033  << col_desc.getScanDesc().getTableKey() << ", col: " << col_desc.getColId()
1034  << "), device_id: " << device_id;
1035  linearized_multi_frag_chunk_iter_cache_.emplace(col_desc, iter_m);
1036  }
1037 }
1038 
1040  const int device_id) const {
1041  auto linearized_chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
1042  if (linearized_chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
1043  auto dev_iter_map_it = linearized_chunk_iter_it->second.find(device_id);
1044  if (dev_iter_map_it != linearized_chunk_iter_it->second.end()) {
1045  VLOG(2) << "Recycle merged chunk_iter for col_desc (tbl: "
1046  << col_desc.getScanDesc().getTableKey() << ", col: " << col_desc.getColId()
1047  << "), device_id: " << device_id;
1048  return dev_iter_map_it->second;
1049  }
1050  }
1051  return nullptr;
1052 }
1053 
1055  AbstractBuffer* merged_index_buf,
1056  ChunkIter& chunk_iter,
1057  bool is_true_varlen_type,
1058  const size_t total_num_tuples) const {
1059  ChunkIter merged_chunk_iter;
1060  if (is_true_varlen_type) {
1061  merged_chunk_iter.start_pos = merged_index_buf->getMemoryPtr();
1062  merged_chunk_iter.current_pos = merged_index_buf->getMemoryPtr();
1063  merged_chunk_iter.end_pos = merged_index_buf->getMemoryPtr() +
1064  merged_index_buf->size() - sizeof(StringOffsetT);
1065  merged_chunk_iter.second_buf = merged_data_buf->getMemoryPtr();
1066  } else {
1067  merged_chunk_iter.start_pos = merged_data_buf->getMemoryPtr();
1068  merged_chunk_iter.current_pos = merged_data_buf->getMemoryPtr();
1069  merged_chunk_iter.end_pos = merged_data_buf->getMemoryPtr() + merged_data_buf->size();
1070  merged_chunk_iter.second_buf = nullptr;
1071  }
1072  merged_chunk_iter.num_elems = total_num_tuples;
1073  merged_chunk_iter.skip = chunk_iter.skip;
1074  merged_chunk_iter.skip_size = chunk_iter.skip_size;
1075  merged_chunk_iter.type_info = chunk_iter.type_info;
1076  return merged_chunk_iter;
1077 }
1078 
1080  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
1081  CHECK(executor_);
1082  const auto data_mgr = executor_->getDataMgr();
1083 
1084  if (!linearized_data_buf_cache_.empty()) {
1085  for (auto& kv : linearized_data_buf_cache_) {
1086  for (auto& kv2 : kv.second) {
1087  data_mgr->free(kv2.second);
1088  }
1089  }
1090  }
1091 
1092  if (!linearized_idx_buf_cache_.empty()) {
1093  for (auto& kv : linearized_idx_buf_cache_) {
1094  for (auto& kv2 : kv.second) {
1095  data_mgr->free(kv2.second);
1096  }
1097  }
1098  }
1099 }
1100 
1102  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
1103  CHECK(executor_);
1104  const auto data_mgr = executor_->getDataMgr();
1107  data_mgr->free(kv.second);
1108  }
1109  }
1110 }
1111 
1113  const ResultSetPtr& buffer,
1114  const shared::TableKey& table_key,
1115  const int col_id,
1116  const Data_Namespace::MemoryLevel memory_level,
1117  const int device_id,
1118  DeviceAllocator* device_allocator,
1119  const size_t thread_idx) const {
1120  const ColumnarResults* result{nullptr};
1121  {
1122  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
1123  if (columnarized_table_cache_.empty() ||
1124  !columnarized_table_cache_.count(table_key)) {
1125  columnarized_table_cache_.insert(std::make_pair(
1126  table_key, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
1127  }
1128  auto& frag_id_to_result = columnarized_table_cache_[table_key];
1129  int frag_id = 0;
1130  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
1131  frag_id_to_result.insert(
1132  std::make_pair(frag_id,
1133  std::shared_ptr<const ColumnarResults>(
1134  columnarize_result(executor_->row_set_mem_owner_,
1135  buffer,
1136  executor_->executor_id_,
1137  thread_idx,
1138  frag_id))));
1139  }
1140  CHECK_NE(size_t(0), columnarized_table_cache_.count(table_key));
1141  result = columnarized_table_cache_[table_key][frag_id].get();
1142  }
1143  CHECK_GE(col_id, 0);
1144  return transferColumnIfNeeded(
1145  result, col_id, executor_->getDataMgr(), memory_level, device_id, device_allocator);
1146 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
int8_t * start_pos
Definition: ChunkIter.h:34
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
std::unordered_map< int, int8_t * > DeviceMergedChunkIterMap
int8_t * current_pos
Definition: ChunkIter.h:33
SQLTypeInfo type_info
Definition: ChunkIter.h:31
std::unordered_map< int, AbstractBuffer * > DeviceMergedChunkMap
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1623
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::mutex varlen_chunk_fetch_mutex_
void freeLinearizedBuf()
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
virtual int8_t * getMemoryPtr()=0
std::mutex columnar_fetch_mutex_
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:254
#define CHECK_GE(x, y)
Definition: Logger.h:306
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1470
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
virtual int8_t * alloc(const size_t num_bytes)=0
ColumnCacheMap columnarized_table_cache_
Constants for Builtin SQL Types supported by HEAVY.AI.
std::shared_ptr< ResultSet > ResultSetPtr
const int8_t * getChunkiter(const InputColDescriptor col_desc, const int device_id=0) const
Executor * executor_
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:122
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:134
const int8_t * getOneTableColumnFragment(const shared::TableKey &table_key, const int frag_id, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
const int8_t * col_buff
std::mutex linearization_mutex_
#define CHECK_GT(x, y)
Definition: Logger.h:305
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const size_t executor_id, const int frag_id)
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
const int8_t * getAllTableColumnFragments(const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
int32_t StringOffsetT
Definition: sqltypes.h:1493
__device__ bool check_interrupt()
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:246
void addMergedChunkIter(const InputColDescriptor col_desc, const int device_id, int8_t *chunk_iter_ptr) const
std::unordered_map< int, AbstractBuffer * > linearlized_temporary_cpu_index_buf_cache_
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
future< Result > async(Fn &&fn, Args &&...args)
int8_t * end_pos
Definition: ChunkIter.h:35
size_t num_elems
Definition: ChunkIter.h:38
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define INJECT_TIMER(DESC)
Definition: measure.h:96
#define CHECK_NE(x, y)
Definition: Logger.h:302
const ColumnDescriptor * get_column_descriptor_maybe(const shared::ColumnKey &column_key)
Definition: Execute.h:241
const size_t size() const
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:86
int getColId() const
const ColumnDescriptor * get_column_descriptor(const shared::ColumnKey &column_key)
Definition: Execute.h:213
executor_(executor)
#define LIKELY(x)
Definition: likely.h:24
std::mutex linearized_col_cache_mutex_
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
ColumnFetcher(Executor *executor, const ColumnCacheMap &column_cache)
const ChunkMetadataMap & getChunkMetadataMap() const
MergedChunk linearizeFixedLenArrayColFrags(int32_t db_id, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
int64_t getBufferSize() const
Definition: FlatBuffer.h:563
std::mutex chunk_list_mutex_
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:79
void freeTemporaryCpuLinearizedIdxBuf()
const shared::TableKey & getTableKey() const
InputSourceType getSourceType() const
int skip_size
Definition: ChunkIter.h:37
#define CHECK_LT(x, y)
Definition: Logger.h:303
const shared::ColumnKey & getColumnKey() const
Definition: Analyzer.h:198
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
MergedChunk linearizeVarLenArrayColFrags(int32_t db_id, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
int32_t ArrayOffsetT
Definition: sqltypes.h:1494
const int8_t * linearizeColumnFragments(const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
std::pair< AbstractBuffer *, AbstractBuffer * > MergedChunk
Definition: ColumnFetcher.h:47
int8_t * second_buf
Definition: ChunkIter.h:32
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
int skip
Definition: ChunkIter.h:36
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
ChunkIter prepareChunkIter(AbstractBuffer *merged_data_buf, AbstractBuffer *merged_index_buf, ChunkIter &chunk_iter, bool is_true_varlen_type, const size_t total_num_tuples) const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_idx_buf_cache_
unencoded array encoder
size_t num_elems
HOST static DEVICE bool isFlatBuffer(const void *buffer)
Definition: FlatBuffer.h:528
const std::vector< int8_t * > & getColumnBuffers() const
int cpu_threads()
Definition: thread_count.h:25
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
const InputDescriptor & getScanDesc() const
size_t g_enable_parallel_linearization
Definition: Execute.cpp:152
#define VLOG(n)
Definition: Logger.h:388
Type timer_start()
Definition: measure.h:42
const SQLTypeInfo & getColumnType(const int col_id) const