OmniSciDB  ab4938a6a3
ColumnFetcher.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <memory>
20 
21 #include "QueryEngine/Execute.h"
22 
23 ColumnFetcher::ColumnFetcher(Executor* executor, const ColumnCacheMap& column_cache)
24  : executor_(executor), columnarized_table_cache_(column_cache) {}
25 
29 std::pair<const int8_t*, size_t> ColumnFetcher::getOneColumnFragment(
30  Executor* executor,
31  const Analyzer::ColumnVar& hash_col,
32  const Fragmenter_Namespace::FragmentInfo& fragment,
33  const Data_Namespace::MemoryLevel effective_mem_lvl,
34  const int device_id,
35  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
36  ColumnCacheMap& column_cache) {
37  static std::mutex columnar_conversion_mutex;
38  if (fragment.isEmptyPhysicalFragment()) {
39  return {nullptr, 0};
40  }
41  auto chunk_meta_it = fragment.getChunkMetadataMap().find(hash_col.get_column_id());
42  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
43  const auto& catalog = *executor->getCatalog();
44  const auto cd = get_column_descriptor_maybe(
45  hash_col.get_column_id(), hash_col.get_table_id(), catalog);
46  CHECK(!cd || !(cd->isVirtualCol));
47  const int8_t* col_buff = nullptr;
48  if (cd) { // real table
49  ChunkKey chunk_key{catalog.getCurrentDB().dbId,
50  fragment.physicalTableId,
51  hash_col.get_column_id(),
52  fragment.fragmentId};
53  const auto chunk = Chunk_NS::Chunk::getChunk(
54  cd,
55  &catalog.getDataMgr(),
56  chunk_key,
57  effective_mem_lvl,
58  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
59  chunk_meta_it->second->numBytes,
60  chunk_meta_it->second->numElements);
61  chunks_owner.push_back(chunk);
62  CHECK(chunk);
63  auto ab = chunk->getBuffer();
64  CHECK(ab->getMemoryPtr());
65  col_buff = reinterpret_cast<int8_t*>(ab->getMemoryPtr());
66  } else { // temporary table
67  const ColumnarResults* col_frag{nullptr};
68  {
69  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
70  const auto table_id = hash_col.get_table_id();
71  const auto frag_id = fragment.fragmentId;
72  if (column_cache.empty() || !column_cache.count(table_id)) {
73  column_cache.insert(std::make_pair(
74  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
75  }
76  auto& frag_id_to_result = column_cache[table_id];
77  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
78  frag_id_to_result.insert(std::make_pair(
79  frag_id,
80  std::shared_ptr<const ColumnarResults>(columnarize_result(
81  executor->row_set_mem_owner_,
82  get_temporary_table(executor->temporary_tables_, hash_col.get_table_id()),
83  frag_id))));
84  }
85  col_frag = column_cache[table_id][frag_id].get();
86  }
87  col_buff = transferColumnIfNeeded(
88  col_frag,
89  hash_col.get_column_id(),
90  &catalog.getDataMgr(),
91  effective_mem_lvl,
92  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id);
93  }
94  return {col_buff, fragment.getNumTuples()};
95 }
96 
107  Executor* executor,
108  const Analyzer::ColumnVar& hash_col,
109  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
110  const Data_Namespace::MemoryLevel effective_mem_lvl,
111  const int device_id,
112  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
113  std::vector<std::shared_ptr<void>>& malloc_owner,
114  ColumnCacheMap& column_cache) {
115  CHECK(!fragments.empty());
116 
117  size_t col_chunks_buff_sz = sizeof(struct JoinChunk) * fragments.size();
118  // TODO: needs an allocator owner
119  auto col_chunks_buff = reinterpret_cast<int8_t*>(
120  malloc_owner.emplace_back(checked_malloc(col_chunks_buff_sz), free).get());
121  auto join_chunk_array = reinterpret_cast<struct JoinChunk*>(col_chunks_buff);
122 
123  size_t num_elems = 0;
124  size_t num_chunks = 0;
125  for (auto& frag : fragments) {
126  auto [col_buff, elem_count] = getOneColumnFragment(
127  executor,
128  hash_col,
129  frag,
130  effective_mem_lvl,
131  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
132  chunks_owner,
133  column_cache);
134  if (col_buff != nullptr) {
135  num_elems += elem_count;
136  join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
137  } else {
138  continue;
139  }
140  ++num_chunks;
141  }
142 
143  int elem_sz = hash_col.get_type_info().get_size();
144  CHECK_GT(elem_sz, 0);
145 
146  return {col_chunks_buff,
147  col_chunks_buff_sz,
148  num_chunks,
149  num_elems,
150  static_cast<size_t>(elem_sz)};
151 }
152 
154  const int table_id,
155  const int frag_id,
156  const int col_id,
157  const std::map<int, const TableFragments*>& all_tables_fragments,
158  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
159  std::list<ChunkIter>& chunk_iter_holder,
160  const Data_Namespace::MemoryLevel memory_level,
161  const int device_id) const {
162  static std::mutex varlen_chunk_mutex; // TODO(alex): remove
163  static std::mutex chunk_list_mutex;
164  const auto fragments_it = all_tables_fragments.find(table_id);
165  CHECK(fragments_it != all_tables_fragments.end());
166  const auto fragments = fragments_it->second;
167  const auto& fragment = (*fragments)[frag_id];
168  if (fragment.isEmptyPhysicalFragment()) {
169  return nullptr;
170  }
171  std::shared_ptr<Chunk_NS::Chunk> chunk;
172  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
173  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
174  CHECK(table_id > 0);
175  const auto& cat = *executor_->getCatalog();
176  auto cd = get_column_descriptor(col_id, table_id, cat);
177  CHECK(cd);
178  const auto col_type =
179  get_column_type(col_id, table_id, cd, executor_->temporary_tables_);
180  const bool is_real_string =
181  col_type.is_string() && col_type.get_compression() == kENCODING_NONE;
182  const bool is_varlen =
183  is_real_string ||
184  col_type.is_array(); // TODO: should it be col_type.is_varlen_array() ?
185  {
186  ChunkKey chunk_key{
187  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
188  std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
189  if (is_varlen) {
190  varlen_chunk_lock.reset(new std::lock_guard<std::mutex>(varlen_chunk_mutex));
191  }
193  cd,
194  &cat.getDataMgr(),
195  chunk_key,
196  memory_level,
197  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id,
198  chunk_meta_it->second->numBytes,
199  chunk_meta_it->second->numElements);
200  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex);
201  chunk_holder.push_back(chunk);
202  }
203  if (is_varlen) {
204  CHECK_GT(table_id, 0);
205  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
206  chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
207  auto& chunk_iter = chunk_iter_holder.back();
208  if (memory_level == Data_Namespace::CPU_LEVEL) {
209  return reinterpret_cast<int8_t*>(&chunk_iter);
210  } else {
211  auto ab = chunk->getBuffer();
212  ab->pin();
213  auto& row_set_mem_owner = executor_->getRowSetMemoryOwner();
214  row_set_mem_owner->addVarlenInputBuffer(ab);
215  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
216  auto& data_mgr = cat.getDataMgr();
217  auto chunk_iter_gpu = CudaAllocator::alloc(&data_mgr, sizeof(ChunkIter), device_id);
218  copy_to_gpu(&data_mgr,
219  reinterpret_cast<CUdeviceptr>(chunk_iter_gpu),
220  &chunk_iter,
221  sizeof(ChunkIter),
222  device_id);
223  return chunk_iter_gpu;
224  }
225  } else {
226  auto ab = chunk->getBuffer();
227  CHECK(ab->getMemoryPtr());
228  return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
229  }
230 }
231 
233  const int table_id,
234  const int col_id,
235  const std::map<int, const TableFragments*>& all_tables_fragments,
236  const Data_Namespace::MemoryLevel memory_level,
237  const int device_id) const {
238  const auto fragments_it = all_tables_fragments.find(table_id);
239  CHECK(fragments_it != all_tables_fragments.end());
240  const auto fragments = fragments_it->second;
241  const auto frag_count = fragments->size();
242  std::vector<std::unique_ptr<ColumnarResults>> column_frags;
243  const ColumnarResults* table_column = nullptr;
244  const InputColDescriptor col_desc(col_id, table_id, int(0));
246  {
247  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex_);
248  auto column_it = columnarized_scan_table_cache_.find(col_desc);
249  if (column_it == columnarized_scan_table_cache_.end()) {
250  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
251  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
252  std::list<ChunkIter> chunk_iter_holder;
253  const auto& fragment = (*fragments)[frag_id];
254  if (fragment.isEmptyPhysicalFragment()) {
255  continue;
256  }
257  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
258  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
259  auto col_buffer = getOneTableColumnFragment(table_id,
260  static_cast<int>(frag_id),
261  col_id,
262  all_tables_fragments,
263  chunk_holder,
264  chunk_iter_holder,
266  int(0));
267  column_frags.push_back(
268  std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
269  col_buffer,
270  fragment.getNumTuples(),
271  chunk_meta_it->second->sqlType));
272  }
273  auto merged_results =
274  ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
275  table_column = merged_results.get();
276  columnarized_scan_table_cache_.emplace(col_desc, std::move(merged_results));
277  } else {
278  table_column = column_it->second.get();
279  }
280  }
282  table_column, 0, &executor_->getCatalog()->getDataMgr(), memory_level, device_id);
283 }
284 
286  const InputColDescriptor* col_desc,
287  const Data_Namespace::MemoryLevel memory_level,
288  const int device_id) const {
289  CHECK(col_desc);
290  const auto table_id = col_desc->getScanDesc().getTableId();
291  return getResultSetColumn(get_temporary_table(executor_->temporary_tables_, table_id),
292  table_id,
293  col_desc->getColId(),
294  memory_level,
295  device_id);
296 }
297 
299  const ColumnarResults* columnar_results,
300  const int col_id,
301  Data_Namespace::DataMgr* data_mgr,
302  const Data_Namespace::MemoryLevel memory_level,
303  const int device_id) {
304  if (!columnar_results) {
305  return nullptr;
306  }
307  const auto& col_buffers = columnar_results->getColumnBuffers();
308  CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
309  if (memory_level == Data_Namespace::GPU_LEVEL) {
310  const auto& col_ti = columnar_results->getColumnType(col_id);
311  const auto num_bytes = columnar_results->size() * col_ti.get_size();
312  auto gpu_col_buffer = CudaAllocator::alloc(data_mgr, num_bytes, device_id);
313  copy_to_gpu(data_mgr,
314  reinterpret_cast<CUdeviceptr>(gpu_col_buffer),
315  col_buffers[col_id],
316  num_bytes,
317  device_id);
318  return gpu_col_buffer;
319  }
320  return col_buffers[col_id];
321 }
322 
324  const ResultSetPtr& buffer,
325  const int table_id,
326  const int col_id,
327  const Data_Namespace::MemoryLevel memory_level,
328  const int device_id) const {
329  const ColumnarResults* result{nullptr};
330  {
331  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex_);
332  if (columnarized_table_cache_.empty() || !columnarized_table_cache_.count(table_id)) {
333  columnarized_table_cache_.insert(std::make_pair(
334  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
335  }
336  auto& frag_id_to_result = columnarized_table_cache_[table_id];
337  int frag_id = 0;
338  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
339  frag_id_to_result.insert(
340  std::make_pair(frag_id,
341  std::shared_ptr<const ColumnarResults>(columnarize_result(
342  executor_->row_set_mem_owner_, buffer, frag_id))));
343  }
344  CHECK_NE(size_t(0), columnarized_table_cache_.count(table_id));
345  result = columnarized_table_cache_[table_id][frag_id].get();
346  }
347  CHECK_GE(col_id, 0);
348  return transferColumnIfNeeded(
349  result, col_id, &executor_->getCatalog()->getDataMgr(), memory_level, device_id);
350 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
int get_column_id() const
Definition: Analyzer.h:195
const ChunkMetadataMap & getChunkMetadataMap() const
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
InputSourceType getSourceType() const
int getTableId() const
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:193
#define CHECK_GE(x, y)
Definition: Logger.h:210
HOST DEVICE int get_size() const
Definition: sqltypes.h:268
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
ColumnCacheMap columnarized_table_cache_
Definition: ColumnFetcher.h:84
std::shared_ptr< ResultSet > ResultSetPtr
Executor * executor_
Definition: ColumnFetcher.h:81
const int8_t * col_buff
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
Definition: ColumnFetcher.h:90
const InputDescriptor & getScanDesc() const
const std::vector< int8_t * > & getColumnBuffers() const
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:185
static int8_t * alloc(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
std::mutex columnar_conversion_mutex_
Definition: ColumnFetcher.h:83
std::string cat(Ts &&... args)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:177
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
#define CHECK_NE(x, y)
Definition: Logger.h:206
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
const size_t size() const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
ColumnFetcher(Executor *executor, const ColumnCacheMap &column_cache)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
#define CHECK_LT(x, y)
Definition: Logger.h:207
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const int frag_id)
Definition: Execute.h:229
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
int get_table_id() const
Definition: Analyzer.h:194
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:35
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id)
size_t num_elems
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:148
const SQLTypeInfo & getColumnType(const int col_id) const