OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ColumnFetcher.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <memory>
20 
22 #include "QueryEngine/Execute.h"
23 
24 namespace {
25 
27  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
28  const ResultSetPtr& result,
29  const size_t thread_idx,
30  const int frag_id) {
32  CHECK_EQ(0, frag_id);
33 
34  std::vector<SQLTypeInfo> col_types;
35  for (size_t i = 0; i < result->colCount(); ++i) {
36  col_types.push_back(get_logical_type_info(result->getColType(i)));
37  }
38  return new ColumnarResults(
39  row_set_mem_owner, *result, result->colCount(), col_types, thread_idx);
40 }
41 
42 } // namespace
43 
44 ColumnFetcher::ColumnFetcher(Executor* executor, const ColumnCacheMap& column_cache)
45  : executor_(executor), columnarized_table_cache_(column_cache) {}
46 
50 std::pair<const int8_t*, size_t> ColumnFetcher::getOneColumnFragment(
51  Executor* executor,
52  const Analyzer::ColumnVar& hash_col,
53  const Fragmenter_Namespace::FragmentInfo& fragment,
54  const Data_Namespace::MemoryLevel effective_mem_lvl,
55  const int device_id,
56  DeviceAllocator* device_allocator,
57  const size_t thread_idx,
58  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
59  ColumnCacheMap& column_cache) {
60  static std::mutex columnar_conversion_mutex;
61  auto timer = DEBUG_TIMER(__func__);
62  if (fragment.isEmptyPhysicalFragment()) {
63  return {nullptr, 0};
64  }
65  const auto table_id = hash_col.get_table_id();
66  const auto& catalog = *executor->getCatalog();
67  const auto cd =
68  get_column_descriptor_maybe(hash_col.get_column_id(), table_id, catalog);
69  CHECK(!cd || !(cd->isVirtualCol));
70  const int8_t* col_buff = nullptr;
71  if (cd) { // real table
72  /* chunk_meta_it is used here to retrieve chunk numBytes and
73  numElements. Apparently, their values are often zeros. If we
74  knew how to predict the zero values, calling
75  getChunkMetadataMap could be avoided to skip
76  synthesize_metadata calls. */
77  auto chunk_meta_it = fragment.getChunkMetadataMap().find(hash_col.get_column_id());
78  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
79  ChunkKey chunk_key{catalog.getCurrentDB().dbId,
80  fragment.physicalTableId,
81  hash_col.get_column_id(),
82  fragment.fragmentId};
83  const auto chunk = Chunk_NS::Chunk::getChunk(
84  cd,
85  &catalog.getDataMgr(),
86  chunk_key,
87  effective_mem_lvl,
88  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
89  chunk_meta_it->second->numBytes,
90  chunk_meta_it->second->numElements);
91  chunks_owner.push_back(chunk);
92  CHECK(chunk);
93  auto ab = chunk->getBuffer();
94  CHECK(ab->getMemoryPtr());
95  col_buff = reinterpret_cast<int8_t*>(ab->getMemoryPtr());
96  } else { // temporary table
97  const ColumnarResults* col_frag{nullptr};
98  {
99  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
100  const auto frag_id = fragment.fragmentId;
101  if (column_cache.empty() || !column_cache.count(table_id)) {
102  column_cache.insert(std::make_pair(
103  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
104  }
105  auto& frag_id_to_result = column_cache[table_id];
106  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
107  frag_id_to_result.insert(
108  std::make_pair(frag_id,
109  std::shared_ptr<const ColumnarResults>(columnarize_result(
110  executor->row_set_mem_owner_,
111  get_temporary_table(executor->temporary_tables_, table_id),
112  thread_idx,
113  frag_id))));
114  }
115  col_frag = column_cache[table_id][frag_id].get();
116  }
117  col_buff = transferColumnIfNeeded(
118  col_frag,
119  hash_col.get_column_id(),
120  &catalog.getDataMgr(),
121  effective_mem_lvl,
122  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
123  device_allocator);
124  }
125  return {col_buff, fragment.getNumTuples()};
126 }
127 
138  Executor* executor,
139  const Analyzer::ColumnVar& hash_col,
140  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
141  const Data_Namespace::MemoryLevel effective_mem_lvl,
142  const int device_id,
143  DeviceAllocator* device_allocator,
144  const size_t thread_idx,
145  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
146  std::vector<std::shared_ptr<void>>& malloc_owner,
147  ColumnCacheMap& column_cache) {
148  CHECK(!fragments.empty());
149 
150  size_t col_chunks_buff_sz = sizeof(struct JoinChunk) * fragments.size();
151  // TODO: needs an allocator owner
152  auto col_chunks_buff = reinterpret_cast<int8_t*>(
153  malloc_owner.emplace_back(checked_malloc(col_chunks_buff_sz), free).get());
154  auto join_chunk_array = reinterpret_cast<struct JoinChunk*>(col_chunks_buff);
155 
156  size_t num_elems = 0;
157  size_t num_chunks = 0;
158  for (auto& frag : fragments) {
161  }
162  auto [col_buff, elem_count] = getOneColumnFragment(
163  executor,
164  hash_col,
165  frag,
166  effective_mem_lvl,
167  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
168  device_allocator,
169  thread_idx,
170  chunks_owner,
171  column_cache);
172  if (col_buff != nullptr) {
173  num_elems += elem_count;
174  join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
175  } else {
176  continue;
177  }
178  ++num_chunks;
179  }
180 
181  int elem_sz = hash_col.get_type_info().get_size();
182  CHECK_GT(elem_sz, 0);
183 
184  return {col_chunks_buff,
185  col_chunks_buff_sz,
186  num_chunks,
187  num_elems,
188  static_cast<size_t>(elem_sz)};
189 }
190 
192  const int table_id,
193  const int frag_id,
194  const int col_id,
195  const std::map<int, const TableFragments*>& all_tables_fragments,
196  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
197  std::list<ChunkIter>& chunk_iter_holder,
198  const Data_Namespace::MemoryLevel memory_level,
199  const int device_id,
200  DeviceAllocator* allocator) const {
201  static std::mutex varlen_chunk_mutex; // TODO(alex): remove
202  static std::mutex chunk_list_mutex;
203  const auto fragments_it = all_tables_fragments.find(table_id);
204  CHECK(fragments_it != all_tables_fragments.end());
205  const auto fragments = fragments_it->second;
206  const auto& fragment = (*fragments)[frag_id];
207  if (fragment.isEmptyPhysicalFragment()) {
208  return nullptr;
209  }
210  std::shared_ptr<Chunk_NS::Chunk> chunk;
211  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
212  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
213  CHECK(table_id > 0);
214  const auto& cat = *executor_->getCatalog();
215  auto cd = get_column_descriptor(col_id, table_id, cat);
216  CHECK(cd);
217  const auto col_type =
218  get_column_type(col_id, table_id, cd, executor_->temporary_tables_);
219  const bool is_real_string =
220  col_type.is_string() && col_type.get_compression() == kENCODING_NONE;
221  const bool is_varlen =
222  is_real_string ||
223  col_type.is_array(); // TODO: should it be col_type.is_varlen_array() ?
224  {
225  ChunkKey chunk_key{
226  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
227  std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
228  if (is_varlen) {
229  varlen_chunk_lock.reset(new std::lock_guard<std::mutex>(varlen_chunk_mutex));
230  }
232  cd,
233  &cat.getDataMgr(),
234  chunk_key,
235  memory_level,
236  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id,
237  chunk_meta_it->second->numBytes,
238  chunk_meta_it->second->numElements);
239  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex);
240  chunk_holder.push_back(chunk);
241  }
242  if (is_varlen) {
243  CHECK_GT(table_id, 0);
244  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
245  chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
246  auto& chunk_iter = chunk_iter_holder.back();
247  if (memory_level == Data_Namespace::CPU_LEVEL) {
248  return reinterpret_cast<int8_t*>(&chunk_iter);
249  } else {
250  auto ab = chunk->getBuffer();
251  ab->pin();
252  auto& row_set_mem_owner = executor_->getRowSetMemoryOwner();
253  row_set_mem_owner->addVarlenInputBuffer(ab);
254  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
255  CHECK(allocator);
256  auto chunk_iter_gpu = allocator->alloc(sizeof(ChunkIter));
257  allocator->copyToDevice(
258  chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter), sizeof(ChunkIter));
259  return chunk_iter_gpu;
260  }
261  } else {
262  auto ab = chunk->getBuffer();
263  CHECK(ab->getMemoryPtr());
264  return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
265  }
266 }
267 
269  const int table_id,
270  const int col_id,
271  const std::map<int, const TableFragments*>& all_tables_fragments,
272  const Data_Namespace::MemoryLevel memory_level,
273  const int device_id,
274  DeviceAllocator* device_allocator,
275  const size_t thread_idx) const {
276  const auto fragments_it = all_tables_fragments.find(table_id);
277  CHECK(fragments_it != all_tables_fragments.end());
278  const auto fragments = fragments_it->second;
279  const auto frag_count = fragments->size();
280  std::vector<std::unique_ptr<ColumnarResults>> column_frags;
281  const ColumnarResults* table_column = nullptr;
282  const InputColDescriptor col_desc(col_id, table_id, int(0));
284  {
285  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
286  auto column_it = columnarized_scan_table_cache_.find(col_desc);
287  if (column_it == columnarized_scan_table_cache_.end()) {
288  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
291  }
292  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
293  std::list<ChunkIter> chunk_iter_holder;
294  const auto& fragment = (*fragments)[frag_id];
295  if (fragment.isEmptyPhysicalFragment()) {
296  continue;
297  }
298  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
299  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
300  auto col_buffer = getOneTableColumnFragment(table_id,
301  static_cast<int>(frag_id),
302  col_id,
303  all_tables_fragments,
304  chunk_holder,
305  chunk_iter_holder,
307  int(0),
308  device_allocator);
309  column_frags.push_back(
310  std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
311  col_buffer,
312  fragment.getNumTuples(),
313  chunk_meta_it->second->sqlType,
314  thread_idx));
315  }
316  auto merged_results =
317  ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
318  table_column = merged_results.get();
319  columnarized_scan_table_cache_.emplace(col_desc, std::move(merged_results));
320  } else {
321  table_column = column_it->second.get();
322  }
323  }
324  return ColumnFetcher::transferColumnIfNeeded(table_column,
325  0,
326  &executor_->getCatalog()->getDataMgr(),
327  memory_level,
328  device_id,
329  device_allocator);
330 }
331 
333  const InputColDescriptor* col_desc,
334  const Data_Namespace::MemoryLevel memory_level,
335  const int device_id,
336  DeviceAllocator* device_allocator,
337  const size_t thread_idx) const {
338  CHECK(col_desc);
339  const auto table_id = col_desc->getScanDesc().getTableId();
340  return getResultSetColumn(get_temporary_table(executor_->temporary_tables_, table_id),
341  table_id,
342  col_desc->getColId(),
343  memory_level,
344  device_id,
345  device_allocator,
346  thread_idx);
347 }
348 
350  const int table_id,
351  const int col_id,
352  const std::map<int, const TableFragments*>& all_tables_fragments,
353  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
354  std::list<ChunkIter>& chunk_iter_holder,
355  const Data_Namespace::MemoryLevel memory_level,
356  const int device_id,
357  DeviceAllocator* device_allocator,
358  const size_t thread_idx) const {
359  // todo(yoonmin): True varlen col linearization
360  const auto fragments_it = all_tables_fragments.find(table_id);
361  CHECK(fragments_it != all_tables_fragments.end());
362  const auto fragments = fragments_it->second;
363  const auto frag_count = fragments->size();
364  const InputColDescriptor col_desc(col_id, table_id, int(0));
365  const auto& cat = *executor_->getCatalog();
366  auto cd = get_column_descriptor(col_id, table_id, cat);
367  CHECK(cd);
369  CHECK_GT(table_id, 0);
370  size_t total_num_tuples = 0;
371  size_t total_data_buf_size = 0;
372  size_t total_idx_buf_size = 0;
373 
374  std::lock_guard<std::mutex> linearize_guard(columnar_fetch_mutex_);
375  auto linearized_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
376  if (linearized_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
377  if (memory_level == CPU_LEVEL) {
378  return getChunkiter(col_desc, 0);
379  } else {
380  // todo(yoonmin): D2D copy of merged chunk and its iter?
381  if (linearized_iter_it->second.find(device_id) !=
382  linearized_iter_it->second.end()) {
383  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
384  device_allocator->copyToDevice(
385  chunk_iter_gpu, getChunkiter(col_desc, device_id), sizeof(ChunkIter));
386  return chunk_iter_gpu;
387  }
388  }
389  }
390 
391  // collect target fragments
392  // in GPU execution, we first load chunks in CPU, and only merge them in GPU
393  std::shared_ptr<Chunk_NS::Chunk> chunk;
394  std::list<std::shared_ptr<Chunk_NS::Chunk>> local_chunk_holder;
395  std::list<ChunkIter> local_chunk_iter_holder;
396  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
397  const auto& fragment = (*fragments)[frag_id];
398  if (fragment.isEmptyPhysicalFragment()) {
399  continue;
400  }
401  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
402  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
403  ChunkKey chunk_key{
404  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
405  chunk = Chunk_NS::Chunk::getChunk(cd,
406  &cat.getDataMgr(),
407  chunk_key,
409  0,
410  chunk_meta_it->second->numBytes,
411  chunk_meta_it->second->numElements);
412  local_chunk_holder.push_back(chunk);
413  local_chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
414  total_num_tuples += fragment.getNumTuples();
415  total_data_buf_size += chunk->getBuffer()->size();
416  if (chunk->getIndexBuf()) {
417  total_idx_buf_size += chunk->getIndexBuf()->size();
418  }
419  }
420  // linearize collected fragments
421  // todo(yoonmin): parallelize this step
422  auto merged_chunk_buffer =
423  cat.getDataMgr().alloc(memory_level,
424  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id,
425  total_data_buf_size);
426  size_t sum_chunk_sizes = 0;
427  for (auto chunk_holder_it = local_chunk_holder.begin();
428  chunk_holder_it != local_chunk_holder.end();
429  chunk_holder_it++) {
431  cat.getDataMgr().free(merged_chunk_buffer);
433  }
434  auto target_chunk = chunk_holder_it->get();
435  auto target_chunk_buffer = target_chunk->getBuffer();
436  merged_chunk_buffer->append(
437  target_chunk_buffer->getMemoryPtr(),
438  target_chunk_buffer->size(),
440  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id);
441  sum_chunk_sizes += target_chunk_buffer->size();
442  }
443  // check whether each chunk's data buffer is clean under chunk merging
444  CHECK_EQ(total_data_buf_size, sum_chunk_sizes);
445 
446  // make ChunkIter for the linearized chunk
447  // todo(yoonmin): cache for merged chunk?
448  auto merged_chunk = std::make_shared<Chunk_NS::Chunk>(merged_chunk_buffer, nullptr, cd);
449  auto merged_chunk_iter = prepareChunkIter(
450  merged_chunk_buffer, *(chunk_iter_holder.begin()), total_num_tuples);
451  chunk_holder.push_back(merged_chunk);
452  chunk_iter_holder.push_back(merged_chunk_iter);
453  auto merged_chunk_iter_ptr = reinterpret_cast<int8_t*>(&(chunk_iter_holder.back()));
454  if (memory_level == MemoryLevel::CPU_LEVEL) {
455  addMergedChunk(col_desc, 0, merged_chunk, merged_chunk_iter_ptr);
456  return merged_chunk_iter_ptr;
457  } else {
458  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
459  CHECK(device_allocator);
460  auto ab = merged_chunk->getBuffer();
461  ab->pin();
462  addMergedChunk(col_desc, device_id, merged_chunk, merged_chunk_iter_ptr);
463  auto chunk_iter_gpu = device_allocator->alloc(sizeof(ChunkIter));
464  device_allocator->copyToDevice(
465  chunk_iter_gpu, merged_chunk_iter_ptr, sizeof(ChunkIter));
466  return chunk_iter_gpu;
467  }
468 }
469 
471  const ColumnarResults* columnar_results,
472  const int col_id,
473  Data_Namespace::DataMgr* data_mgr,
474  const Data_Namespace::MemoryLevel memory_level,
475  const int device_id,
476  DeviceAllocator* device_allocator) {
477  if (!columnar_results) {
478  return nullptr;
479  }
480  const auto& col_buffers = columnar_results->getColumnBuffers();
481  CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
482  if (memory_level == Data_Namespace::GPU_LEVEL) {
483  const auto& col_ti = columnar_results->getColumnType(col_id);
484  const auto num_bytes = columnar_results->size() * col_ti.get_size();
485  CHECK(device_allocator);
486  auto gpu_col_buffer = device_allocator->alloc(num_bytes);
487  device_allocator->copyToDevice(gpu_col_buffer, col_buffers[col_id], num_bytes);
488  return gpu_col_buffer;
489  }
490  return col_buffers[col_id];
491 }
492 
494  const int device_id,
495  std::shared_ptr<Chunk_NS::Chunk> chunk_ptr,
496  int8_t* chunk_iter_ptr) const {
497  // 1. merged_chunk_ptr
498  auto chunk_it = linearized_multi_frag_table_cache_.find(col_desc);
499  if (chunk_it != linearized_multi_frag_table_cache_.end()) {
500  auto chunk_device_it = chunk_it->second.find(device_id);
501  if (chunk_device_it == chunk_it->second.end()) {
502  VLOG(2) << "Additional merged chunk for col_desc (tbl: "
503  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
504  << "), device: " << device_id;
505  chunk_it->second.emplace(device_id, chunk_ptr);
506  }
507  } else {
508  DeviceMergedChunkMap chunk_m;
509  chunk_m.emplace(device_id, chunk_ptr);
510  VLOG(2) << "New merged chunk for col_desc (tbl: "
511  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
512  << "), device: " << device_id;
513  linearized_multi_frag_table_cache_.emplace(col_desc, chunk_m);
514  }
515 
516  // 2. merged_chunk_iter_ptr
517  auto iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
518  if (iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
519  auto iter_device_it = iter_it->second.find(device_id);
520  if (iter_device_it == iter_it->second.end()) {
521  VLOG(2) << "Additional merged chunk_iter for col_desc (tbl: "
522  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
523  << "), device: " << device_id;
524  iter_it->second.emplace(device_id, chunk_iter_ptr);
525  }
526  } else {
528  iter_m.emplace(device_id, chunk_iter_ptr);
529  VLOG(2) << "New merged chunk_iter for col_desc (tbl: "
530  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
531  << "), device: " << device_id;
532  linearized_multi_frag_chunk_iter_cache_.emplace(col_desc, iter_m);
533  }
534 }
535 
536 const int8_t* ColumnFetcher::getChunkiter(const InputColDescriptor col_desc,
537  const int device_id) const {
538  auto linearized_chunk_iter_it = linearized_multi_frag_chunk_iter_cache_.find(col_desc);
539  if (linearized_chunk_iter_it != linearized_multi_frag_chunk_iter_cache_.end()) {
540  auto dev_iter_map_it = linearized_chunk_iter_it->second.find(device_id);
541  if (dev_iter_map_it != linearized_chunk_iter_it->second.end()) {
542  VLOG(2) << "Recycle merged chunk_iter for col_desc (tbl: "
543  << col_desc.getScanDesc().getTableId() << ", col: " << col_desc.getColId()
544  << "), device: " << device_id;
545  return dev_iter_map_it->second;
546  }
547  }
548  return nullptr;
549 }
550 
552  ChunkIter& chunk_iter,
553  const size_t total_num_tuples) const {
554  ChunkIter merged_chunk_iter;
555  merged_chunk_iter.start_pos = merged->getMemoryPtr();
556  merged_chunk_iter.current_pos = merged_chunk_iter.start_pos;
557  merged_chunk_iter.end_pos = merged->getMemoryPtr() + merged->size();
558  merged_chunk_iter.num_elems = total_num_tuples;
559  merged_chunk_iter.skip = chunk_iter.skip;
560  merged_chunk_iter.skip_size = chunk_iter.skip_size;
561  merged_chunk_iter.type_info = chunk_iter.type_info;
562  return merged_chunk_iter;
563 }
564 
566  const ResultSetPtr& buffer,
567  const int table_id,
568  const int col_id,
569  const Data_Namespace::MemoryLevel memory_level,
570  const int device_id,
571  DeviceAllocator* device_allocator,
572  const size_t thread_idx) const {
573  const ColumnarResults* result{nullptr};
574  {
575  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
576  if (columnarized_table_cache_.empty() || !columnarized_table_cache_.count(table_id)) {
577  columnarized_table_cache_.insert(std::make_pair(
578  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
579  }
580  auto& frag_id_to_result = columnarized_table_cache_[table_id];
581  int frag_id = 0;
582  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
583  frag_id_to_result.insert(std::make_pair(
584  frag_id,
585  std::shared_ptr<const ColumnarResults>(columnarize_result(
586  executor_->row_set_mem_owner_, buffer, thread_idx, frag_id))));
587  }
588  CHECK_NE(size_t(0), columnarized_table_cache_.count(table_id));
589  result = columnarized_table_cache_[table_id][frag_id].get();
590  }
591  CHECK_GE(col_id, 0);
593  col_id,
594  &executor_->getCatalog()->getDataMgr(),
595  memory_level,
596  device_id,
597  device_allocator);
598 }
int get_table_id() const
Definition: Analyzer.h:194
std::unordered_map< int, std::shared_ptr< Chunk_NS::Chunk >> DeviceMergedChunkMap
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::vector< int > ChunkKey
Definition: types.h:37
int8_t * start_pos
Definition: ChunkIter.h:33
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
std::unordered_map< int, int8_t * > DeviceMergedChunkIterMap
std::string cat(Ts &&...args)
int8_t * current_pos
Definition: ChunkIter.h:32
SQLTypeInfo type_info
Definition: ChunkIter.h:30
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1116
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
virtual int8_t * getMemoryPtr()=0
std::mutex columnar_fetch_mutex_
virtual void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const =0
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:237
#define CHECK_GE(x, y)
Definition: Logger.h:216
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:911
virtual int8_t * alloc(const size_t num_bytes)=0
ColumnCacheMap columnarized_table_cache_
const int8_t * linearizeColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::shared_ptr< ResultSet > ResultSetPtr
const int8_t * getChunkiter(const InputColDescriptor col_desc, const int device_id=0) const
Executor * executor_
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:115
const int8_t * col_buff
#define CHECK_GT(x, y)
Definition: Logger.h:215
void addMergedChunk(const InputColDescriptor col_desc, const int device_id, std::shared_ptr< Chunk_NS::Chunk > chunk_ptr, int8_t *chunk_iter_ptr) const
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
__device__ bool check_interrupt()
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:229
int8_t * end_pos
Definition: ChunkIter.h:34
size_t num_elems
Definition: ChunkIter.h:37
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:221
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
#define INJECT_TIMER(DESC)
Definition: measure.h:93
#define CHECK_NE(x, y)
Definition: Logger.h:212
const size_t size() const
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:77
int getColId() const
int getTableId() const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
An AbstractBuffer is a unit of data management for a data manager.
ColumnFetcher(Executor *executor, const ColumnCacheMap &column_cache)
const ChunkMetadataMap & getChunkMetadataMap() const
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
InputSourceType getSourceType() const
int skip_size
Definition: ChunkIter.h:36
#define CHECK_LT(x, y)
Definition: Logger.h:213
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const int frag_id)
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
int skip
Definition: ChunkIter.h:35
#define CHECK(condition)
Definition: Logger.h:203
#define DEBUG_TIMER(name)
Definition: Logger.h:319
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
int get_column_id() const
Definition: Analyzer.h:195
ChunkIter prepareChunkIter(AbstractBuffer *merged, ChunkIter &chunk_iter, const size_t total_num_tuples) const
size_t num_elems
const std::vector< int8_t * > & getColumnBuffers() const
const InputDescriptor & getScanDesc() const
#define VLOG(n)
Definition: Logger.h:297
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:192
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_multi_frag_table_cache_
const SQLTypeInfo & getColumnType(const int col_id) const