OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnFetcher.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <memory>
20 
23 #include "QueryEngine/Execute.h"
24 #include "Shared/Intervals.h"
25 #include "Shared/likely.h"
26 #include "Shared/sqltypes.h"
27 
28 namespace {
29 
31  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
32  const ResultSetPtr& result,
33  const size_t thread_idx,
34  const size_t executor_id,
35  const int frag_id) {
37  CHECK_EQ(0, frag_id);
38 
39  std::vector<SQLTypeInfo> col_types;
40  for (size_t i = 0; i < result->colCount(); ++i) {
41  col_types.push_back(get_logical_type_info(result->getColType(i)));
42  }
43  return new ColumnarResults(
44  row_set_mem_owner, *result, result->colCount(), col_types, executor_id, thread_idx);
45 }
46 
48  switch (memoryLevel) {
49  case DISK_LEVEL:
50  return "DISK_LEVEL";
51  case GPU_LEVEL:
52  return "GPU_LEVEL";
53  case CPU_LEVEL:
54  return "CPU_LEVEL";
55  default:
56  return "UNKNOWN";
57  }
58 }
59 } // namespace
60 
61 ColumnFetcher::ColumnFetcher(Executor* executor, const ColumnCacheMap& column_cache)
62  : executor_(executor), columnarized_table_cache_(column_cache) {}
63 
67 std::pair<const int8_t*, size_t> ColumnFetcher::getOneColumnFragment(
68  Executor* executor,
69  const Analyzer::ColumnVar& hash_col,
70  const Fragmenter_Namespace::FragmentInfo& fragment,
71  const Data_Namespace::MemoryLevel effective_mem_lvl,
72  const int device_id,
73  DeviceAllocator* device_allocator,
74  const size_t thread_idx,
75  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
76  ColumnCacheMap& column_cache) {
77  static std::mutex columnar_conversion_mutex;
78  auto timer = DEBUG_TIMER(__func__);
79  if (fragment.isEmptyPhysicalFragment()) {
80  return {nullptr, 0};
81  }
82  const auto table_id = hash_col.get_table_id();
83  const auto& catalog = *executor->getCatalog();
84  const auto cd =
85  get_column_descriptor_maybe(hash_col.get_column_id(), table_id, catalog);
86  CHECK(!cd || !(cd->isVirtualCol));
87  const int8_t* col_buff = nullptr;
88  if (cd) { // real table
89  /* chunk_meta_it is used here to retrieve chunk numBytes and
90  numElements. Apparently, their values are often zeros. If we
91  knew how to predict the zero values, calling
92  getChunkMetadataMap could be avoided to skip
93  synthesize_metadata calls. */
94  auto chunk_meta_it = fragment.getChunkMetadataMap().find(hash_col.get_column_id());
95  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
96  ChunkKey chunk_key{catalog.getCurrentDB().dbId,
97  fragment.physicalTableId,
98  hash_col.get_column_id(),
99  fragment.fragmentId};
100  const auto chunk = Chunk_NS::Chunk::getChunk(
101  cd,
102  &catalog.getDataMgr(),
103  chunk_key,
104  effective_mem_lvl,
105  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
106  chunk_meta_it->second->numBytes,
107  chunk_meta_it->second->numElements);
108  chunks_owner.push_back(chunk);
109  CHECK(chunk);
110  auto ab = chunk->getBuffer();
111  CHECK(ab->getMemoryPtr());
112  col_buff = reinterpret_cast<int8_t*>(ab->getMemoryPtr());
113  } else { // temporary table
114  const ColumnarResults* col_frag{nullptr};
115  {
116  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
117  const auto frag_id = fragment.fragmentId;
118  if (column_cache.empty() || !column_cache.count(table_id)) {
119  column_cache.insert(std::make_pair(
120  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
121  }
122  auto& frag_id_to_result = column_cache[table_id];
123  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
124  frag_id_to_result.insert(
125  std::make_pair(frag_id,
126  std::shared_ptr<const ColumnarResults>(columnarize_result(
127  executor->row_set_mem_owner_,
128  get_temporary_table(executor->temporary_tables_, table_id),
129  executor->executor_id_,
130  thread_idx,
131  frag_id))));
132  }
133  col_frag = column_cache[table_id][frag_id].get();
134  }
135  col_buff = transferColumnIfNeeded(
136  col_frag,
137  hash_col.get_column_id(),
138  &catalog.getDataMgr(),
139  effective_mem_lvl,
140  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
141  device_allocator);
142  }
143  return {col_buff, fragment.getNumTuples()};
144 }
145 
156  Executor* executor,
157  const Analyzer::ColumnVar& hash_col,
158  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
159  const Data_Namespace::MemoryLevel effective_mem_lvl,
160  const int device_id,
161  DeviceAllocator* device_allocator,
162  const size_t thread_idx,
163  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
164  std::vector<std::shared_ptr<void>>& malloc_owner,
165  ColumnCacheMap& column_cache) {
166  CHECK(!fragments.empty());
167 
168  size_t col_chunks_buff_sz = sizeof(struct JoinChunk) * fragments.size();
169  // TODO: needs an allocator owner
170  auto col_chunks_buff = reinterpret_cast<int8_t*>(
171  malloc_owner.emplace_back(checked_malloc(col_chunks_buff_sz), free).get());
172  auto join_chunk_array = reinterpret_cast<struct JoinChunk*>(col_chunks_buff);
173 
174  size_t num_elems = 0;
175  size_t num_chunks = 0;
176  for (auto& frag : fragments) {
178  executor->checkNonKernelTimeInterrupted()) {
180  }
181  auto [col_buff, elem_count] = getOneColumnFragment(
182  executor,
183  hash_col,
184  frag,
185  effective_mem_lvl,
186  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
187  device_allocator,
188  thread_idx,
189  chunks_owner,
190  column_cache);
191  if (col_buff != nullptr) {
192  num_elems += elem_count;
193  join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
194  } else {
195  continue;
196  }
197  ++num_chunks;
198  }
199 
200  int elem_sz = hash_col.get_type_info().get_size();
201  CHECK_GT(elem_sz, 0);
202 
203  return {col_chunks_buff,
204  col_chunks_buff_sz,
205  num_chunks,
206  num_elems,
207  static_cast<size_t>(elem_sz)};
208 }
209 
211  const int table_id,
212  const int frag_id,
213  const int col_id,
214  const std::map<int, const TableFragments*>& all_tables_fragments,
215  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
216  std::list<ChunkIter>& chunk_iter_holder,
217  const Data_Namespace::MemoryLevel memory_level,
218  const int device_id,
219  DeviceAllocator* allocator) const {
220  const auto fragments_it = all_tables_fragments.find(table_id);
221  CHECK(fragments_it != all_tables_fragments.end());
222  const auto fragments = fragments_it->second;
223  const auto& fragment = (*fragments)[frag_id];
224  if (fragment.isEmptyPhysicalFragment()) {
225  return nullptr;
226  }
227  std::shared_ptr<Chunk_NS::Chunk> chunk;
228  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
229  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
230  CHECK(table_id > 0);
231  const auto& cat = *executor_->getCatalog();
232  auto cd = get_column_descriptor(col_id, table_id, cat);
233  CHECK(cd);
234  const auto col_type =
235  get_column_type(col_id, table_id, cd, executor_->temporary_tables_);
236  const bool is_real_string =
237  col_type.is_string() && col_type.get_compression() == kENCODING_NONE;
238  const bool is_varlen =
239  is_real_string ||
240  col_type.is_array(); // TODO: should it be col_type.is_varlen_array() ?
241  {
242  ChunkKey chunk_key{
243  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
244  std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
245  if (is_varlen) {
246  varlen_chunk_lock.reset(new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
247  }
249  cd,
250  &cat.getDataMgr(),
251  chunk_key,
252  memory_level,
253  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id,
254  chunk_meta_it->second->numBytes,
255  chunk_meta_it->second->numElements);
256  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
257  chunk_holder.push_back(chunk);
258  }
259  if (is_varlen) {
260  CHECK_GT(table_id, 0);
261  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
262  chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
263  auto& chunk_iter = chunk_iter_holder.back();
264  if (memory_level == Data_Namespace::CPU_LEVEL) {
265  return reinterpret_cast<int8_t*>(&chunk_iter);
266  } else {
267  auto ab = chunk->getBuffer();
268  ab->pin();
269  auto& row_set_mem_owner = executor_->getRowSetMemoryOwner();
270  row_set_mem_owner->addVarlenInputBuffer(ab);
271  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
272  CHECK(allocator);
273  auto chunk_iter_gpu = allocator->alloc(sizeof(ChunkIter));
274  allocator->copyToDevice(
275  chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter), sizeof(ChunkIter));
276  return chunk_iter_gpu;
277  }
278  } else {
279  auto ab = chunk->getBuffer();
280  CHECK(ab->getMemoryPtr());
281  return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
282  }
283 }
284 
286  const int table_id,
287  const int col_id,
288  const std::map<int, const TableFragments*>& all_tables_fragments,
289  const Data_Namespace::MemoryLevel memory_level,
290  const int device_id,
291  DeviceAllocator* device_allocator,
292  const size_t thread_idx) const {
293  const auto fragments_it = all_tables_fragments.find(table_id);
294  CHECK(fragments_it != all_tables_fragments.end());
295  const auto fragments = fragments_it->second;
296  const auto frag_count = fragments->size();
297  std::vector<std::unique_ptr<ColumnarResults>> column_frags;
298  const ColumnarResults* table_column = nullptr;
299  const InputColDescriptor col_desc(col_id, table_id, int(0));
301  {
302  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
303  auto column_it = columnarized_scan_table_cache_.find(col_desc);
304  if (column_it == columnarized_scan_table_cache_.end()) {
305  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
307  executor_->checkNonKernelTimeInterrupted()) {
309  }
310  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
311  std::list<ChunkIter> chunk_iter_holder;
312  const auto& fragment = (*fragments)[frag_id];
313  if (fragment.isEmptyPhysicalFragment()) {
314  continue;
315  }
316  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
317  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
318  auto col_buffer = getOneTableColumnFragment(table_id,
319  static_cast<int>(frag_id),
320  col_id,
321  all_tables_fragments,
322  chunk_holder,
323  chunk_iter_holder,
325  int(0),
326  device_allocator);
327  column_frags.push_back(
328  std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
329  col_buffer,
330  fragment.getNumTuples(),
331  chunk_meta_it->second->sqlType,
332  executor_->executor_id_,
333  thread_idx));
334  }
335  auto merged_results =
336  ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
337  table_column = merged_results.get();
338  columnarized_scan_table_cache_.emplace(col_desc, std::move(merged_results));
339  } else {
340  table_column = column_it->second.get();
341  }
342  }
343  return ColumnFetcher::transferColumnIfNeeded(table_column,
344  0,
345  executor_->getDataMgr(),
346  memory_level,
347  device_id,
348  device_allocator);
349 }
350 
352  const InputColDescriptor* col_desc,
353  const Data_Namespace::MemoryLevel memory_level,
354  const int device_id,
355  DeviceAllocator* device_allocator,
356  const size_t thread_idx) const {
357  CHECK(col_desc);
358  const auto table_id = col_desc->getScanDesc().getTableId();
359  return getResultSetColumn(get_temporary_table(executor_->temporary_tables_, table_id),
360  table_id,
361  col_desc->getColId(),
362  memory_level,
363  device_id,
364  device_allocator,
365  thread_idx);
366 }
367 
369  const int table_id,
370  const int col_id,
371  const std::map<int, const TableFragments*>& all_tables_fragments,
372  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
373  std::list<ChunkIter>& chunk_iter_holder,
374  const Data_Namespace::MemoryLevel memory_level,
375  const int device_id,
376  DeviceAllocator* device_allocator,
377  const size_t thread_idx) const {
378  auto timer = DEBUG_TIMER(__func__);
379  const auto fragments_it = all_tables_fragments.find(table_id);
380  CHECK(fragments_it != all_tables_fragments.end());
381  const auto fragments = fragments_it->second;
382  const auto frag_count = fragments->size();
383  const InputColDescriptor col_desc(col_id, table_id, int(0));
384  const auto& cat = *executor_->getCatalog();
385  auto cd = get_column_descriptor(col_id, table_id, cat);
386  CHECK(cd);
388  CHECK_GT(table_id, 0);
389  bool is_varlen_chunk = cd->columnType.is_varlen() && !cd->columnType.is_fixlen_array();
390  size_t total_num_tuples = 0;
391  size_t total_data_buf_size = 0;
392  size_t total_idx_buf_size = 0;
393  {
394  std::lock_guard<std::mutex> linearize_guard(linearized_col_cache_mutex_);
395  auto linearized_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
396  if (linearized_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
397  if (memory_level == CPU_LEVEL) {
398  // in CPU execution, each kernel can share merged chunk since they operates in the
399  // same memory space, so we can share the same chunk iter among kernels
400  return getChunkiter(col_desc, 0);
401  } else {
402  // in GPU execution, this becomes the matter when we deploy multi-GPUs
403  // so we only share the chunk_iter iff kernels are launched on the same GPU device
404  // otherwise we need to separately load merged chunk and its iter
405  // todo(yoonmin): D2D copy of merged chunk and its iter?
406  if (linearized_iter_it->second.find(device_id) !=
407  linearized_iter_it->second.end()) {
408  // note that cached chunk_iter is located on CPU memory space...
409  // we just need to copy it to each device
410  // chunk_iter already contains correct buffer addr depending on execution device
411  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
412  device_allocator->copyToDevice(
413  chunk_iter_gpu, getChunkiter(col_desc, device_id), sizeof(ChunkIter));
414  return chunk_iter_gpu;
415  }
416  }
417  }
418  }
419 
420  // collect target fragments
421  // basically we load chunk in CPU first, and do necessary manipulation
422  // to make semantics of a merged chunk correctly
423  std::shared_ptr<Chunk_NS::Chunk> chunk;
424  std::list<std::shared_ptr<Chunk_NS::Chunk>> local_chunk_holder;
425  std::list<ChunkIter> local_chunk_iter_holder;
426  std::list<size_t> local_chunk_num_tuples;
427  {
428  std::lock_guard<std::mutex> linearize_guard(varlen_chunk_fetch_mutex_);
429  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
430  const auto& fragment = (*fragments)[frag_id];
431  if (fragment.isEmptyPhysicalFragment()) {
432  continue;
433  }
434  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
435  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
436  ChunkKey chunk_key{
437  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
438  chunk = Chunk_NS::Chunk::getChunk(cd,
439  &cat.getDataMgr(),
440  chunk_key,
442  0,
443  chunk_meta_it->second->numBytes,
444  chunk_meta_it->second->numElements);
445  local_chunk_holder.push_back(chunk);
446  auto chunk_iter = chunk->begin_iterator(chunk_meta_it->second);
447  local_chunk_iter_holder.push_back(chunk_iter);
448  local_chunk_num_tuples.push_back(fragment.getNumTuples());
449  total_num_tuples += fragment.getNumTuples();
450  total_data_buf_size += chunk->getBuffer()->size();
451  std::ostringstream oss;
452  oss << "Load chunk for col_name: " << chunk->getColumnDesc()->columnName
453  << ", col_id: " << chunk->getColumnDesc()->columnId << ", Frag-" << frag_id
454  << ", numTuples: " << fragment.getNumTuples()
455  << ", data_size: " << chunk->getBuffer()->size();
456  if (chunk->getIndexBuf()) {
457  auto idx_buf_size = chunk->getIndexBuf()->size() - sizeof(ArrayOffsetT);
458  oss << ", index_size: " << idx_buf_size;
459  total_idx_buf_size += idx_buf_size;
460  }
461  VLOG(2) << oss.str();
462  }
463  }
464 
465  auto& col_ti = cd->columnType;
466  MergedChunk res{nullptr, nullptr};
467  // Do linearize multi-fragmented column depending on column type
468  // We cover array and non-encoded text columns
469  // Note that geo column is actually organized as a set of arrays
470  // and each geo object has different set of vectors that they require
471  // Here, we linearize each array at a time, so eventually the geo object has a set of
472  // "linearized" arrays
473  {
474  std::lock_guard<std::mutex> linearization_guard(linearization_mutex_);
475  if (col_ti.is_array()) {
476  if (col_ti.is_fixlen_array()) {
477  VLOG(2) << "Linearize fixed-length multi-frag array column (col_id: "
478  << cd->columnId << ", col_name: " << cd->columnName
479  << ", device_type: " << getMemoryLevelString(memory_level)
480  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
482  chunk_holder,
483  chunk_iter_holder,
484  local_chunk_holder,
485  local_chunk_iter_holder,
486  local_chunk_num_tuples,
487  memory_level,
488  cd,
489  device_id,
490  total_data_buf_size,
491  total_idx_buf_size,
492  total_num_tuples,
493  device_allocator,
494  thread_idx);
495  } else {
496  CHECK(col_ti.is_varlen_array());
497  VLOG(2) << "Linearize variable-length multi-frag array column (col_id: "
498  << cd->columnId << ", col_name: " << cd->columnName
499  << ", device_type: " << getMemoryLevelString(memory_level)
500  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
502  chunk_holder,
503  chunk_iter_holder,
504  local_chunk_holder,
505  local_chunk_iter_holder,
506  local_chunk_num_tuples,
507  memory_level,
508  cd,
509  device_id,
510  total_data_buf_size,
511  total_idx_buf_size,
512  total_num_tuples,
513  device_allocator,
514  thread_idx);
515  }
516  }
517  if (col_ti.is_string() && !col_ti.is_dict_encoded_string()) {
518  VLOG(2) << "Linearize variable-length multi-frag non-encoded text column (col_id: "
519  << cd->columnId << ", col_name: " << cd->columnName
520  << ", device_type: " << getMemoryLevelString(memory_level)
521  << ", device_id: " << device_id << "): " << cd->columnType.to_string();
523  chunk_holder,
524  chunk_iter_holder,
525  local_chunk_holder,
526  local_chunk_iter_holder,
527  local_chunk_num_tuples,
528  memory_level,
529  cd,
530  device_id,
531  total_data_buf_size,
532  total_idx_buf_size,
533  total_num_tuples,
534  device_allocator,
535  thread_idx);
536  }
537  }
538  CHECK(res.first); // check merged data buffer
539  if (!col_ti.is_fixlen_array()) {
540  CHECK(res.second); // check merged index buffer
541  }
542  auto merged_data_buffer = res.first;
543  auto merged_index_buffer = res.second;
544 
545  // prepare ChunkIter for the linearized chunk
546  auto merged_chunk =
547  std::make_shared<Chunk_NS::Chunk>(merged_data_buffer, merged_index_buffer, cd);
548  // to prepare chunk_iter for the merged chunk, we pass one of local chunk iter
549  // to fill necessary metadata that is a common for all merged chunks
550  auto merged_chunk_iter = prepareChunkIter(merged_data_buffer,
551  merged_index_buffer,
552  *(local_chunk_iter_holder.rbegin()),
553  is_varlen_chunk,
554  total_num_tuples);
555  {
556  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
557  chunk_holder.push_back(merged_chunk);
558  chunk_iter_holder.push_back(merged_chunk_iter);
559  }
560 
561  auto merged_chunk_iter_ptr = reinterpret_cast<int8_t*>(&(chunk_iter_holder.back()));
562  if (memory_level == MemoryLevel::CPU_LEVEL) {
563  addMergedChunkIter(col_desc, 0, merged_chunk_iter_ptr);
564  return merged_chunk_iter_ptr;
565  } else {
566  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
567  CHECK(device_allocator);
568  addMergedChunkIter(col_desc, device_id, merged_chunk_iter_ptr);
569  // note that merged_chunk_iter_ptr resides in CPU memory space
570  // having its content aware GPU buffer that we alloc. for merging
571  // so we need to copy this chunk_iter to each device explicitly
572  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
573  device_allocator->copyToDevice(
574  chunk_iter_gpu, merged_chunk_iter_ptr, sizeof(ChunkIter));
575  return chunk_iter_gpu;
576  }
577 }
578 
581  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
582  std::list<ChunkIter>& chunk_iter_holder,
583  std::list<std::shared_ptr<Chunk_NS::Chunk>>& local_chunk_holder,
584  std::list<ChunkIter>& local_chunk_iter_holder,
585  std::list<size_t>& local_chunk_num_tuples,
586  MemoryLevel memory_level,
587  const ColumnDescriptor* cd,
588  const int device_id,
589  const size_t total_data_buf_size,
590  const size_t total_idx_buf_size,
591  const size_t total_num_tuples,
592  DeviceAllocator* device_allocator,
593  const size_t thread_idx) const {
594  // for linearization of varlen col we have to deal with not only data buffer
595  // but also its underlying index buffer which is responsible for offset of varlen value
596  // basically we maintain per-device linearized (data/index) buffer
597  // for data buffer, we linearize varlen col's chunks within a device-specific buffer
598  // by just appending each chunk
599  // for index buffer, we need to not only appending each chunk but modify the offset
600  // value to affect various conditions like nullness, padding and so on so we first
601  // append index buffer in CPU, manipulate it as we required and then copy it to specific
602  // device if necessary (for GPU execution)
603  AbstractBuffer* merged_index_buffer_in_cpu = nullptr;
604  AbstractBuffer* merged_data_buffer = nullptr;
605  bool has_cached_merged_idx_buf = false;
606  bool has_cached_merged_data_buf = false;
607  const InputColDescriptor icd(cd->columnId, cd->tableId, int(0));
608  // check linearized buffer's cache first
609  // if not exists, alloc necessary buffer space to prepare linearization
610  int64_t linearization_time_ms = 0;
611  auto clock_begin = timer_start();
612  {
613  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
614  auto cached_data_buf_cache_it = linearized_data_buf_cache_.find(icd);
615  if (cached_data_buf_cache_it != linearized_data_buf_cache_.end()) {
616  auto& cd_cache = cached_data_buf_cache_it->second;
617  auto cached_data_buf_it = cd_cache.find(device_id);
618  if (cached_data_buf_it != cd_cache.end()) {
619  has_cached_merged_data_buf = true;
620  merged_data_buffer = cached_data_buf_it->second;
621  VLOG(2) << "Recycle merged data buffer for linearized chunks (memory_level: "
622  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
623  << ")";
624  } else {
625  merged_data_buffer =
626  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
627  VLOG(2) << "Allocate " << total_data_buf_size
628  << " bytes of data buffer space for linearized chunks (memory_level: "
629  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
630  << ")";
631  cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
632  }
633  } else {
635  merged_data_buffer =
636  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
637  VLOG(2) << "Allocate " << total_data_buf_size
638  << " bytes of data buffer space for linearized chunks (memory_level: "
639  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
640  << ")";
641  m.insert(std::make_pair(device_id, merged_data_buffer));
642  linearized_data_buf_cache_.insert(std::make_pair(icd, m));
643  }
644 
645  auto cached_index_buf_it =
647  if (cached_index_buf_it != linearlized_temporary_cpu_index_buf_cache_.end()) {
648  has_cached_merged_idx_buf = true;
649  merged_index_buffer_in_cpu = cached_index_buf_it->second;
650  VLOG(2)
651  << "Recycle merged temporary idx buffer for linearized chunks (memory_level: "
652  << getMemoryLevelString(memory_level) << ", device_id: " << device_id << ")";
653  } else {
654  auto idx_buf_size = total_idx_buf_size + sizeof(ArrayOffsetT);
655  merged_index_buffer_in_cpu =
656  cat.getDataMgr().alloc(Data_Namespace::CPU_LEVEL, 0, idx_buf_size);
657  VLOG(2) << "Allocate " << idx_buf_size
658  << " bytes of temporary idx buffer space on CPU for linearized chunks";
659  // just copy the buf addr since we access it via the pointer itself
661  std::make_pair(cd->columnId, merged_index_buffer_in_cpu));
662  }
663  }
664 
665  // linearize buffers if we don't have corresponding buf in cache
666  size_t sum_data_buf_size = 0;
667  size_t sum_idx_buf_size = 0;
668  size_t cur_sum_num_tuples = 0;
669  size_t total_idx_size_modifier = 0;
670  auto chunk_holder_it = local_chunk_holder.begin();
671  auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
672  auto chunk_num_tuple_it = local_chunk_num_tuples.begin();
673  bool null_padded_first_elem = false;
674  bool null_padded_last_val = false;
675  // before entering the actual linearization part, we first need to check
676  // the overflow case where the sum of index offset becomes larger than 2GB
677  // which currently incurs incorrect query result due to negative array offset
678  // note that we can separate this from the main linearization logic b/c
679  // we just need to see few last elems
680  // todo (yoonmin) : relax this to support larger chunk size (>2GB)
681  for (; chunk_holder_it != local_chunk_holder.end();
682  chunk_holder_it++, chunk_num_tuple_it++) {
683  // check the offset overflow based on the last "valid" offset for each chunk
684  auto target_chunk = chunk_holder_it->get();
685  auto target_chunk_data_buffer = target_chunk->getBuffer();
686  auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
687  auto target_idx_buf_ptr =
688  reinterpret_cast<ArrayOffsetT*>(target_chunk_idx_buffer->getMemoryPtr());
689  auto cur_chunk_num_tuples = *chunk_num_tuple_it;
690  ArrayOffsetT original_offset = -1;
691  size_t cur_idx = cur_chunk_num_tuples;
692  // find the valid (e.g., non-null) offset starting from the last elem
693  while (original_offset < 0) {
694  original_offset = target_idx_buf_ptr[--cur_idx];
695  }
696  ArrayOffsetT new_offset = original_offset + sum_data_buf_size;
697  if (new_offset < 0) {
698  throw std::runtime_error(
699  "Linearization of a variable-length column having chunk size larger than 2GB "
700  "not supported yet");
701  }
702  sum_data_buf_size += target_chunk_data_buffer->size();
703  }
704  chunk_holder_it = local_chunk_holder.begin();
705  chunk_num_tuple_it = local_chunk_num_tuples.begin();
706  sum_data_buf_size = 0;
707 
708  for (; chunk_holder_it != local_chunk_holder.end();
709  chunk_holder_it++, chunk_iter_holder_it++, chunk_num_tuple_it++) {
711  executor_->checkNonKernelTimeInterrupted()) {
713  }
714  auto target_chunk = chunk_holder_it->get();
715  auto target_chunk_data_buffer = target_chunk->getBuffer();
716  auto cur_chunk_num_tuples = *chunk_num_tuple_it;
717  auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
718  auto target_idx_buf_ptr =
719  reinterpret_cast<ArrayOffsetT*>(target_chunk_idx_buffer->getMemoryPtr());
720  auto idx_buf_size = target_chunk_idx_buffer->size() - sizeof(ArrayOffsetT);
721  auto target_data_buffer_start_ptr = target_chunk_data_buffer->getMemoryPtr();
722  auto target_data_buffer_size = target_chunk_data_buffer->size();
723 
724  // when linearizing idx buffers, we need to consider the following cases
725  // 1. the first idx val is padded (a. null / b. empty varlen arr / c. 1-byte size
726  // varlen arr, i.e., {1})
727  // 2. the last idx val is null
728  // 3. null value(s) is/are located in a middle of idx buf <-- we don't need to care
729  if (cur_sum_num_tuples > 0 && target_idx_buf_ptr[0] > 0) {
730  null_padded_first_elem = true;
731  target_data_buffer_start_ptr += ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
732  target_data_buffer_size -= ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
733  total_idx_size_modifier += ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
734  }
735  // we linearize data_buf in device-specific buffer
736  if (!has_cached_merged_data_buf) {
737  merged_data_buffer->append(target_data_buffer_start_ptr,
738  target_data_buffer_size,
740  device_id);
741  }
742 
743  if (!has_cached_merged_idx_buf) {
744  // linearize idx buf in CPU first
745  merged_index_buffer_in_cpu->append(target_chunk_idx_buffer->getMemoryPtr(),
746  idx_buf_size,
748  0); // merged_index_buffer_in_cpu resides in CPU
749  auto idx_buf_ptr =
750  reinterpret_cast<ArrayOffsetT*>(merged_index_buffer_in_cpu->getMemoryPtr());
751  // here, we do not need to manipulate the very first idx buf, just let it as is
752  // and modify otherwise (i.e., starting from second chunk idx buf)
753  if (cur_sum_num_tuples > 0) {
754  if (null_padded_last_val) {
755  // case 2. the previous chunk's last index val is null so we need to set this
756  // chunk's first val to be null
757  idx_buf_ptr[cur_sum_num_tuples] = -sum_data_buf_size;
758  }
759  const size_t worker_count = cpu_threads();
760  std::vector<std::future<void>> conversion_threads;
761  std::vector<std::vector<size_t>> null_padded_row_idx_vecs(worker_count,
762  std::vector<size_t>());
763  bool is_parallel_modification = false;
764  std::vector<size_t> null_padded_row_idx_vec;
765  const auto do_work = [&cur_sum_num_tuples,
766  &sum_data_buf_size,
767  &null_padded_first_elem,
768  &idx_buf_ptr](
769  const size_t start,
770  const size_t end,
771  const bool is_parallel_modification,
772  std::vector<size_t>* null_padded_row_idx_vec) {
773  for (size_t i = start; i < end; i++) {
774  if (LIKELY(idx_buf_ptr[cur_sum_num_tuples + i] >= 0)) {
775  if (null_padded_first_elem) {
776  // deal with null padded bytes
777  idx_buf_ptr[cur_sum_num_tuples + i] -=
779  }
780  idx_buf_ptr[cur_sum_num_tuples + i] += sum_data_buf_size;
781  } else {
782  // null padded row needs to reference the previous row idx so in
783  // multi-threaded index modification we may suffer from thread
784  // contention when thread-i needs to reference thread-j's row idx so we
785  // collect row idxs for null rows here and deal with them after this
786  // step
787  null_padded_row_idx_vec->push_back(cur_sum_num_tuples + i);
788  }
789  }
790  };
791  if (cur_chunk_num_tuples > g_enable_parallel_linearization) {
792  is_parallel_modification = true;
793  for (auto interval :
794  makeIntervals(size_t(0), cur_chunk_num_tuples, worker_count)) {
795  conversion_threads.push_back(
797  do_work,
798  interval.begin,
799  interval.end,
800  is_parallel_modification,
801  &null_padded_row_idx_vecs[interval.index]));
802  }
803  for (auto& child : conversion_threads) {
804  child.wait();
805  }
806  for (auto& v : null_padded_row_idx_vecs) {
807  std::copy(v.begin(), v.end(), std::back_inserter(null_padded_row_idx_vec));
808  }
809  } else {
810  do_work(size_t(0),
811  cur_chunk_num_tuples,
812  is_parallel_modification,
813  &null_padded_row_idx_vec);
814  }
815  if (!null_padded_row_idx_vec.empty()) {
816  // modify null padded row idxs by referencing the previous row
817  // here we sort row idxs to correctly propagate modified row idxs
818  std::sort(null_padded_row_idx_vec.begin(), null_padded_row_idx_vec.end());
819  for (auto& padded_null_row_idx : null_padded_row_idx_vec) {
820  if (idx_buf_ptr[padded_null_row_idx - 1] > 0) {
821  idx_buf_ptr[padded_null_row_idx] = -idx_buf_ptr[padded_null_row_idx - 1];
822  } else {
823  idx_buf_ptr[padded_null_row_idx] = idx_buf_ptr[padded_null_row_idx - 1];
824  }
825  }
826  }
827  }
828  }
829  sum_idx_buf_size += idx_buf_size;
830  cur_sum_num_tuples += cur_chunk_num_tuples;
831  sum_data_buf_size += target_chunk_data_buffer->size();
832  if (target_idx_buf_ptr[*chunk_num_tuple_it] < 0) {
833  null_padded_last_val = true;
834  } else {
835  null_padded_last_val = false;
836  }
837  if (null_padded_first_elem) {
838  sum_data_buf_size -= ArrayNoneEncoder::DEFAULT_NULL_PADDING_SIZE;
839  null_padded_first_elem = false; // set for the next chunk
840  }
841  if (!has_cached_merged_idx_buf && cur_sum_num_tuples == total_num_tuples) {
842  auto merged_index_buffer_ptr =
843  reinterpret_cast<ArrayOffsetT*>(merged_index_buffer_in_cpu->getMemoryPtr());
844  merged_index_buffer_ptr[total_num_tuples] =
845  total_data_buf_size -
846  total_idx_size_modifier; // last index value is total data size;
847  }
848  }
849 
850  // put linearized index buffer to per-device cache
851  AbstractBuffer* merged_index_buffer = nullptr;
852  size_t buf_size = total_idx_buf_size + sizeof(ArrayOffsetT);
853  auto copyBuf =
854  [&device_allocator](
855  int8_t* src, int8_t* dest, size_t buf_size, MemoryLevel memory_level) {
856  if (memory_level == Data_Namespace::CPU_LEVEL) {
857  memcpy((void*)dest, src, buf_size);
858  } else {
859  CHECK(memory_level == Data_Namespace::GPU_LEVEL);
860  device_allocator->copyToDevice(dest, src, buf_size);
861  }
862  };
863  {
864  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
865  auto merged_idx_buf_cache_it = linearized_idx_buf_cache_.find(icd);
866  // for CPU execution, we can use `merged_index_buffer_in_cpu` as is
867  // but for GPU, we have to copy it to corresponding device
868  if (memory_level == MemoryLevel::GPU_LEVEL) {
869  if (merged_idx_buf_cache_it != linearized_idx_buf_cache_.end()) {
870  auto& merged_idx_buf_cache = merged_idx_buf_cache_it->second;
871  auto merged_idx_buf_it = merged_idx_buf_cache.find(device_id);
872  if (merged_idx_buf_it != merged_idx_buf_cache.end()) {
873  merged_index_buffer = merged_idx_buf_it->second;
874  } else {
875  merged_index_buffer = cat.getDataMgr().alloc(memory_level, device_id, buf_size);
876  copyBuf(merged_index_buffer_in_cpu->getMemoryPtr(),
877  merged_index_buffer->getMemoryPtr(),
878  buf_size,
879  memory_level);
880  merged_idx_buf_cache.insert(std::make_pair(device_id, merged_index_buffer));
881  }
882  } else {
883  merged_index_buffer = cat.getDataMgr().alloc(memory_level, device_id, buf_size);
884  copyBuf(merged_index_buffer_in_cpu->getMemoryPtr(),
885  merged_index_buffer->getMemoryPtr(),
886  buf_size,
887  memory_level);
889  m.insert(std::make_pair(device_id, merged_index_buffer));
890  linearized_idx_buf_cache_.insert(std::make_pair(icd, m));
891  }
892  } else {
893  // `linearlized_temporary_cpu_index_buf_cache_` has this buf
894  merged_index_buffer = merged_index_buffer_in_cpu;
895  }
896  }
897  CHECK(merged_index_buffer);
898  linearization_time_ms += timer_stop(clock_begin);
899  VLOG(2) << "Linearization has been successfully done, elapsed time: "
900  << linearization_time_ms << " ms.";
901  return {merged_data_buffer, merged_index_buffer};
902 }
903 
906  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
907  std::list<ChunkIter>& chunk_iter_holder,
908  std::list<std::shared_ptr<Chunk_NS::Chunk>>& local_chunk_holder,
909  std::list<ChunkIter>& local_chunk_iter_holder,
910  std::list<size_t>& local_chunk_num_tuples,
911  MemoryLevel memory_level,
912  const ColumnDescriptor* cd,
913  const int device_id,
914  const size_t total_data_buf_size,
915  const size_t total_idx_buf_size,
916  const size_t total_num_tuples,
917  DeviceAllocator* device_allocator,
918  const size_t thread_idx) const {
919  int64_t linearization_time_ms = 0;
920  auto clock_begin = timer_start();
921  // linearize collected fragments
922  AbstractBuffer* merged_data_buffer = nullptr;
923  bool has_cached_merged_data_buf = false;
924  const InputColDescriptor icd(cd->columnId, cd->tableId, int(0));
925  {
926  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
927  auto cached_data_buf_cache_it = linearized_data_buf_cache_.find(icd);
928  if (cached_data_buf_cache_it != linearized_data_buf_cache_.end()) {
929  auto& cd_cache = cached_data_buf_cache_it->second;
930  auto cached_data_buf_it = cd_cache.find(device_id);
931  if (cached_data_buf_it != cd_cache.end()) {
932  has_cached_merged_data_buf = true;
933  merged_data_buffer = cached_data_buf_it->second;
934  VLOG(2) << "Recycle merged data buffer for linearized chunks (memory_level: "
935  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
936  << ")";
937  } else {
938  merged_data_buffer =
939  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
940  VLOG(2) << "Allocate " << total_data_buf_size
941  << " bytes of data buffer space for linearized chunks (memory_level: "
942  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
943  << ")";
944  cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
945  }
946  } else {
948  merged_data_buffer =
949  cat.getDataMgr().alloc(memory_level, device_id, total_data_buf_size);
950  VLOG(2) << "Allocate " << total_data_buf_size
951  << " bytes of data buffer space for linearized chunks (memory_level: "
952  << getMemoryLevelString(memory_level) << ", device_id: " << device_id
953  << ")";
954  m.insert(std::make_pair(device_id, merged_data_buffer));
955  linearized_data_buf_cache_.insert(std::make_pair(icd, m));
956  }
957  }
958  if (!has_cached_merged_data_buf) {
959  size_t sum_data_buf_size = 0;
960  auto chunk_holder_it = local_chunk_holder.begin();
961  auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
962  for (; chunk_holder_it != local_chunk_holder.end();
963  chunk_holder_it++, chunk_iter_holder_it++) {
966  }
967  auto target_chunk = chunk_holder_it->get();
968  auto target_chunk_data_buffer = target_chunk->getBuffer();
969  merged_data_buffer->append(target_chunk_data_buffer->getMemoryPtr(),
970  target_chunk_data_buffer->size(),
972  device_id);
973  sum_data_buf_size += target_chunk_data_buffer->size();
974  }
975  // check whether each chunk's data buffer is clean under chunk merging
976  CHECK_EQ(total_data_buf_size, sum_data_buf_size);
977  }
978  linearization_time_ms += timer_stop(clock_begin);
979  VLOG(2) << "Linearization has been successfully done, elapsed time: "
980  << linearization_time_ms << " ms.";
981  return {merged_data_buffer, nullptr};
982 }
983 
985  const ColumnarResults* columnar_results,
986  const int col_id,
987  Data_Namespace::DataMgr* data_mgr,
988  const Data_Namespace::MemoryLevel memory_level,
989  const int device_id,
990  DeviceAllocator* device_allocator) {
991  if (!columnar_results) {
992  return nullptr;
993  }
994  const auto& col_buffers = columnar_results->getColumnBuffers();
995  CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
996  if (memory_level == Data_Namespace::GPU_LEVEL) {
997  const auto& col_ti = columnar_results->getColumnType(col_id);
998  const auto num_bytes = columnar_results->size() * col_ti.get_size();
999  CHECK(device_allocator);
1000  auto gpu_col_buffer = device_allocator->alloc(num_bytes);
1001  device_allocator->copyToDevice(gpu_col_buffer, col_buffers[col_id], num_bytes);
1002  return gpu_col_buffer;
1003  }
1004  return col_buffers[col_id];
1005 }
1006 
1008  const int device_id,
1009  int8_t* chunk_iter_ptr) const {
1010  std::lock_guard<std::mutex> linearize_guard(linearized_col_cache_mutex_);
1011  auto chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
1012  if (chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
1013  auto iter_device_it = chunk_iter_it->second.find(device_id);
1014  if (iter_device_it == chunk_iter_it->second.end()) {
1015  VLOG(2) << "Additional merged chunk_iter for col_desc (tbl: "
1016  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
1017  << "), device_id: " << device_id;
1018  chunk_iter_it->second.emplace(device_id, chunk_iter_ptr);
1019  }
1020  } else {
1021  DeviceMergedChunkIterMap iter_m;
1022  iter_m.emplace(device_id, chunk_iter_ptr);
1023  VLOG(2) << "New merged chunk_iter for col_desc (tbl: "
1024  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
1025  << "), device_id: " << device_id;
1026  linearized_multi_frag_chunk_iter_cache_.emplace(col_desc, iter_m);
1027  }
1028 }
1029 
1031  const int device_id) const {
1032  auto linearized_chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
1033  if (linearized_chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
1034  auto dev_iter_map_it = linearized_chunk_iter_it->second.find(device_id);
1035  if (dev_iter_map_it != linearized_chunk_iter_it->second.end()) {
1036  VLOG(2) << "Recycle merged chunk_iter for col_desc (tbl: "
1037  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
1038  << "), device_id: " << device_id;
1039  return dev_iter_map_it->second;
1040  }
1041  }
1042  return nullptr;
1043 }
1044 
1046  AbstractBuffer* merged_index_buf,
1047  ChunkIter& chunk_iter,
1048  bool is_true_varlen_type,
1049  const size_t total_num_tuples) const {
1050  ChunkIter merged_chunk_iter;
1051  if (is_true_varlen_type) {
1052  merged_chunk_iter.start_pos = merged_index_buf->getMemoryPtr();
1053  merged_chunk_iter.current_pos = merged_index_buf->getMemoryPtr();
1054  merged_chunk_iter.end_pos = merged_index_buf->getMemoryPtr() +
1055  merged_index_buf->size() - sizeof(StringOffsetT);
1056  merged_chunk_iter.second_buf = merged_data_buf->getMemoryPtr();
1057  } else {
1058  merged_chunk_iter.start_pos = merged_data_buf->getMemoryPtr();
1059  merged_chunk_iter.current_pos = merged_data_buf->getMemoryPtr();
1060  merged_chunk_iter.end_pos = merged_data_buf->getMemoryPtr() + merged_data_buf->size();
1061  merged_chunk_iter.second_buf = nullptr;
1062  }
1063  merged_chunk_iter.num_elems = total_num_tuples;
1064  merged_chunk_iter.skip = chunk_iter.skip;
1065  merged_chunk_iter.skip_size = chunk_iter.skip_size;
1066  merged_chunk_iter.type_info = chunk_iter.type_info;
1067  return merged_chunk_iter;
1068 }
1069 
1071  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
1072  const auto& cat = *executor_->getCatalog();
1073  auto& data_mgr = cat.getDataMgr();
1074 
1075  if (!linearized_data_buf_cache_.empty()) {
1076  for (auto& kv : linearized_data_buf_cache_) {
1077  for (auto& kv2 : kv.second) {
1078  data_mgr.free(kv2.second);
1079  }
1080  }
1081  }
1082 
1083  if (!linearized_idx_buf_cache_.empty()) {
1084  for (auto& kv : linearized_idx_buf_cache_) {
1085  for (auto& kv2 : kv.second) {
1086  data_mgr.free(kv2.second);
1087  }
1088  }
1089  }
1090 }
1091 
1093  std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
1094  const auto& cat = *executor_->getCatalog();
1095  auto& data_mgr = cat.getDataMgr();
1098  data_mgr.free(kv.second);
1099  }
1100  }
1101 }
1102 
1104  const ResultSetPtr& buffer,
1105  const int table_id,
1106  const int col_id,
1107  const Data_Namespace::MemoryLevel memory_level,
1108  const int device_id,
1109  DeviceAllocator* device_allocator,
1110  const size_t thread_idx) const {
1111  const ColumnarResults* result{nullptr};
1112  {
1113  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
1114  if (columnarized_table_cache_.empty() || !columnarized_table_cache_.count(table_id)) {
1115  columnarized_table_cache_.insert(std::make_pair(
1116  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
1117  }
1118  auto& frag_id_to_result = columnarized_table_cache_[table_id];
1119  int frag_id = 0;
1120  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
1121  frag_id_to_result.insert(
1122  std::make_pair(frag_id,
1123  std::shared_ptr<const ColumnarResults>(
1124  columnarize_result(executor_->row_set_mem_owner_,
1125  buffer,
1126  executor_->executor_id_,
1127  thread_idx,
1128  frag_id))));
1129  }
1130  CHECK_NE(size_t(0), columnarized_table_cache_.count(table_id));
1131  result = columnarized_table_cache_[table_id][frag_id].get();
1132  }
1133  CHECK_GE(col_id, 0);
1134  return transferColumnIfNeeded(
1135  result, col_id, executor_->getDataMgr(), memory_level, device_id, device_allocator);
1136 }
int get_table_id() const
Definition: Analyzer.h:193
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::vector< int > ChunkKey
Definition: types.h:37
int8_t * start_pos
Definition: ChunkIter.h:33
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
std::unordered_map< int, int8_t * > DeviceMergedChunkIterMap
std::string cat(Ts &&...args)
int8_t * current_pos
Definition: ChunkIter.h:32
SQLTypeInfo type_info
Definition: ChunkIter.h:30
std::unordered_map< int, AbstractBuffer * > DeviceMergedChunkMap
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1163
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:113
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::mutex varlen_chunk_fetch_mutex_
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:224
void freeLinearizedBuf()
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
virtual int8_t * getMemoryPtr()=0
std::mutex columnar_fetch_mutex_
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:234
#define CHECK_GE(x, y)
Definition: Logger.h:222
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1049
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
virtual int8_t * alloc(const size_t num_bytes)=0
ColumnCacheMap columnarized_table_cache_
Constants for Builtin SQL Types supported by OmniSci.
const int8_t * linearizeColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::shared_ptr< ResultSet > ResultSetPtr
const int8_t * getChunkiter(const InputColDescriptor col_desc, const int device_id=0) const
Executor * executor_
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:116
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:119
const int8_t * col_buff
std::mutex linearization_mutex_
#define CHECK_GT(x, y)
Definition: Logger.h:221
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const size_t executor_id, const int frag_id)
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
int32_t StringOffsetT
Definition: sqltypes.h:1075
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
__device__ bool check_interrupt()
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:226
void addMergedChunkIter(const InputColDescriptor col_desc, const int device_id, int8_t *chunk_iter_ptr) const
std::unordered_map< int, AbstractBuffer * > linearlized_temporary_cpu_index_buf_cache_
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
future< Result > async(Fn &&fn, Args &&...args)
int8_t * end_pos
Definition: ChunkIter.h:34
size_t num_elems
Definition: ChunkIter.h:37
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:218
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define INJECT_TIMER(DESC)
Definition: measure.h:93
#define CHECK_NE(x, y)
Definition: Logger.h:218
const size_t size() const
MergedChunk linearizeVarLenArrayColFrags(const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
MergedChunk linearizeFixedLenArrayColFrags(const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:77
int getColId() const
int getTableId() const
#define LIKELY(x)
Definition: likely.h:24
std::mutex linearized_col_cache_mutex_
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
ColumnFetcher(Executor *executor, const ColumnCacheMap &column_cache)
const ChunkMetadataMap & getChunkMetadataMap() const
std::mutex chunk_list_mutex_
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:77
void freeTemporaryCpuLinearizedIdxBuf()
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
InputSourceType getSourceType() const
int skip_size
Definition: ChunkIter.h:36
#define CHECK_LT(x, y)
Definition: Logger.h:219
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
int32_t ArrayOffsetT
Definition: sqltypes.h:1076
std::pair< AbstractBuffer *, AbstractBuffer * > MergedChunk
Definition: ColumnFetcher.h:47
int8_t * second_buf
Definition: ChunkIter.h:31
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
int skip
Definition: ChunkIter.h:35
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
ChunkIter prepareChunkIter(AbstractBuffer *merged_data_buf, AbstractBuffer *merged_index_buf, ChunkIter &chunk_iter, bool is_true_varlen_type, const size_t total_num_tuples) const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
int get_column_id() const
Definition: Analyzer.h:194
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_idx_buf_cache_
unencoded array encoder
size_t num_elems
const std::vector< int8_t * > & getColumnBuffers() const
int cpu_threads()
Definition: thread_count.h:24
const InputDescriptor & getScanDesc() const
size_t g_enable_parallel_linearization
Definition: Execute.cpp:137
#define VLOG(n)
Definition: Logger.h:303
Type timer_start()
Definition: measure.h:42
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:189
const SQLTypeInfo & getColumnType(const int col_id) const
AbstractBuffer * alloc(const MemoryLevel memoryLevel, const int deviceId, const size_t numBytes)
Definition: DataMgr.cpp:517