OmniSciDB  8a228a1076
ColumnFetcher.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <memory>
20 
21 #include "QueryEngine/Execute.h"
22 
23 ColumnFetcher::ColumnFetcher(Executor* executor, const ColumnCacheMap& column_cache)
24  : executor_(executor), columnarized_table_cache_(column_cache) {}
25 
29 std::pair<const int8_t*, size_t> ColumnFetcher::getOneColumnFragment(
30  Executor* executor,
31  const Analyzer::ColumnVar& hash_col,
32  const Fragmenter_Namespace::FragmentInfo& fragment,
33  const Data_Namespace::MemoryLevel effective_mem_lvl,
34  const int device_id,
35  DeviceAllocator* device_allocator,
36  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
37  ColumnCacheMap& column_cache) {
38  static std::mutex columnar_conversion_mutex;
39  if (fragment.isEmptyPhysicalFragment()) {
40  return {nullptr, 0};
41  }
42  const auto table_id = hash_col.get_table_id();
43  auto chunk_meta_it = fragment.getChunkMetadataMap().find(hash_col.get_column_id());
44  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
45  const auto& catalog = *executor->getCatalog();
46  const auto cd =
47  get_column_descriptor_maybe(hash_col.get_column_id(), table_id, catalog);
48  CHECK(!cd || !(cd->isVirtualCol));
49  const int8_t* col_buff = nullptr;
50  if (cd) { // real table
51  ChunkKey chunk_key{catalog.getCurrentDB().dbId,
52  fragment.physicalTableId,
53  hash_col.get_column_id(),
54  fragment.fragmentId};
55  const auto chunk = Chunk_NS::Chunk::getChunk(
56  cd,
57  &catalog.getDataMgr(),
58  chunk_key,
59  effective_mem_lvl,
60  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
61  chunk_meta_it->second->numBytes,
62  chunk_meta_it->second->numElements);
63  chunks_owner.push_back(chunk);
64  CHECK(chunk);
65  auto ab = chunk->getBuffer();
66  CHECK(ab->getMemoryPtr());
67  col_buff = reinterpret_cast<int8_t*>(ab->getMemoryPtr());
68  } else { // temporary table
69  const ColumnarResults* col_frag{nullptr};
70  {
71  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
72  const auto frag_id = fragment.fragmentId;
73  if (column_cache.empty() || !column_cache.count(table_id)) {
74  column_cache.insert(std::make_pair(
75  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
76  }
77  auto& frag_id_to_result = column_cache[table_id];
78  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
79  frag_id_to_result.insert(
80  std::make_pair(frag_id,
81  std::shared_ptr<const ColumnarResults>(columnarize_result(
82  executor->row_set_mem_owner_,
83  get_temporary_table(executor->temporary_tables_, table_id),
84  frag_id))));
85  }
86  col_frag = column_cache[table_id][frag_id].get();
87  }
88  col_buff = transferColumnIfNeeded(
89  col_frag,
90  hash_col.get_column_id(),
91  &catalog.getDataMgr(),
92  effective_mem_lvl,
93  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
94  device_allocator);
95  }
96  return {col_buff, fragment.getNumTuples()};
97 }
98 
109  Executor* executor,
110  const Analyzer::ColumnVar& hash_col,
111  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
112  const Data_Namespace::MemoryLevel effective_mem_lvl,
113  const int device_id,
114  DeviceAllocator* device_allocator,
115  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
116  std::vector<std::shared_ptr<void>>& malloc_owner,
117  ColumnCacheMap& column_cache) {
118  CHECK(!fragments.empty());
119 
120  size_t col_chunks_buff_sz = sizeof(struct JoinChunk) * fragments.size();
121  // TODO: needs an allocator owner
122  auto col_chunks_buff = reinterpret_cast<int8_t*>(
123  malloc_owner.emplace_back(checked_malloc(col_chunks_buff_sz), free).get());
124  auto join_chunk_array = reinterpret_cast<struct JoinChunk*>(col_chunks_buff);
125 
126  size_t num_elems = 0;
127  size_t num_chunks = 0;
128  for (auto& frag : fragments) {
129  auto [col_buff, elem_count] = getOneColumnFragment(
130  executor,
131  hash_col,
132  frag,
133  effective_mem_lvl,
134  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
135  device_allocator,
136  chunks_owner,
137  column_cache);
138  if (col_buff != nullptr) {
139  num_elems += elem_count;
140  join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
141  } else {
142  continue;
143  }
144  ++num_chunks;
145  }
146 
147  CHECK(!hash_col.get_type_info().is_column());
148  int elem_sz = hash_col.get_type_info().get_size();
149  CHECK_GT(elem_sz, 0);
150 
151  return {col_chunks_buff,
152  col_chunks_buff_sz,
153  num_chunks,
154  num_elems,
155  static_cast<size_t>(elem_sz)};
156 }
157 
159  const int table_id,
160  const int frag_id,
161  const int col_id,
162  const std::map<int, const TableFragments*>& all_tables_fragments,
163  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
164  std::list<ChunkIter>& chunk_iter_holder,
165  const Data_Namespace::MemoryLevel memory_level,
166  const int device_id,
167  DeviceAllocator* allocator) const {
168  static std::mutex varlen_chunk_mutex; // TODO(alex): remove
169  static std::mutex chunk_list_mutex;
170  const auto fragments_it = all_tables_fragments.find(table_id);
171  CHECK(fragments_it != all_tables_fragments.end());
172  const auto fragments = fragments_it->second;
173  const auto& fragment = (*fragments)[frag_id];
174  if (fragment.isEmptyPhysicalFragment()) {
175  return nullptr;
176  }
177  std::shared_ptr<Chunk_NS::Chunk> chunk;
178  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
179  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
180  CHECK(table_id > 0);
181  const auto& cat = *executor_->getCatalog();
182  auto cd = get_column_descriptor(col_id, table_id, cat);
183  CHECK(cd);
184  const auto col_type =
185  get_column_type(col_id, table_id, cd, executor_->temporary_tables_);
186  const bool is_real_string =
187  col_type.is_string() && col_type.get_compression() == kENCODING_NONE;
188  const bool is_varlen =
189  is_real_string ||
190  col_type.is_array(); // TODO: should it be col_type.is_varlen_array() ?
191  {
192  ChunkKey chunk_key{
193  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
194  std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
195  if (is_varlen) {
196  varlen_chunk_lock.reset(new std::lock_guard<std::mutex>(varlen_chunk_mutex));
197  }
199  cd,
200  &cat.getDataMgr(),
201  chunk_key,
202  memory_level,
203  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id,
204  chunk_meta_it->second->numBytes,
205  chunk_meta_it->second->numElements);
206  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex);
207  chunk_holder.push_back(chunk);
208  }
209  if (is_varlen) {
210  CHECK_GT(table_id, 0);
211  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
212  chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
213  auto& chunk_iter = chunk_iter_holder.back();
214  if (memory_level == Data_Namespace::CPU_LEVEL) {
215  return reinterpret_cast<int8_t*>(&chunk_iter);
216  } else {
217  auto ab = chunk->getBuffer();
218  ab->pin();
219  auto& row_set_mem_owner = executor_->getRowSetMemoryOwner();
220  row_set_mem_owner->addVarlenInputBuffer(ab);
221  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
222  CHECK(allocator);
223  auto chunk_iter_gpu = allocator->alloc(sizeof(ChunkIter));
224  allocator->copyToDevice(
225  chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter), sizeof(ChunkIter));
226  return chunk_iter_gpu;
227  }
228  } else {
229  auto ab = chunk->getBuffer();
230  CHECK(ab->getMemoryPtr());
231  return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
232  }
233 }
234 
236  const int table_id,
237  const int col_id,
238  const std::map<int, const TableFragments*>& all_tables_fragments,
239  const Data_Namespace::MemoryLevel memory_level,
240  const int device_id,
241  DeviceAllocator* device_allocator) const {
242  const auto fragments_it = all_tables_fragments.find(table_id);
243  CHECK(fragments_it != all_tables_fragments.end());
244  const auto fragments = fragments_it->second;
245  const auto frag_count = fragments->size();
246  std::vector<std::unique_ptr<ColumnarResults>> column_frags;
247  const ColumnarResults* table_column = nullptr;
248  const InputColDescriptor col_desc(col_id, table_id, int(0));
250  {
251  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex_);
252  auto column_it = columnarized_scan_table_cache_.find(col_desc);
253  if (column_it == columnarized_scan_table_cache_.end()) {
254  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
255  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
256  std::list<ChunkIter> chunk_iter_holder;
257  const auto& fragment = (*fragments)[frag_id];
258  if (fragment.isEmptyPhysicalFragment()) {
259  continue;
260  }
261  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
262  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
263  auto col_buffer = getOneTableColumnFragment(table_id,
264  static_cast<int>(frag_id),
265  col_id,
266  all_tables_fragments,
267  chunk_holder,
268  chunk_iter_holder,
270  int(0),
271  device_allocator);
272  column_frags.push_back(
273  std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
274  col_buffer,
275  fragment.getNumTuples(),
276  chunk_meta_it->second->sqlType));
277  }
278  auto merged_results =
279  ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
280  table_column = merged_results.get();
281  columnarized_scan_table_cache_.emplace(col_desc, std::move(merged_results));
282  } else {
283  table_column = column_it->second.get();
284  }
285  }
286  return ColumnFetcher::transferColumnIfNeeded(table_column,
287  0,
288  &executor_->getCatalog()->getDataMgr(),
289  memory_level,
290  device_id,
291  device_allocator);
292 }
293 
295  const InputColDescriptor* col_desc,
296  const Data_Namespace::MemoryLevel memory_level,
297  const int device_id,
298  DeviceAllocator* device_allocator) const {
299  CHECK(col_desc);
300  const auto table_id = col_desc->getScanDesc().getTableId();
301  return getResultSetColumn(get_temporary_table(executor_->temporary_tables_, table_id),
302  table_id,
303  col_desc->getColId(),
304  memory_level,
305  device_id,
306  device_allocator);
307 }
308 
310  const ColumnarResults* columnar_results,
311  const int col_id,
312  Data_Namespace::DataMgr* data_mgr,
313  const Data_Namespace::MemoryLevel memory_level,
314  const int device_id,
315  DeviceAllocator* device_allocator) {
316  if (!columnar_results) {
317  return nullptr;
318  }
319  const auto& col_buffers = columnar_results->getColumnBuffers();
320  CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
321  if (memory_level == Data_Namespace::GPU_LEVEL) {
322  const auto& col_ti_ = columnar_results->getColumnType(col_id);
323  const auto& col_ti = (col_ti_.is_column() ? col_ti_.get_elem_type() : col_ti_);
324  const auto num_bytes = columnar_results->size() * col_ti.get_size();
325  CHECK(device_allocator);
326  auto gpu_col_buffer = device_allocator->alloc(num_bytes);
327  device_allocator->copyToDevice(gpu_col_buffer, col_buffers[col_id], num_bytes);
328  return gpu_col_buffer;
329  }
330  return col_buffers[col_id];
331 }
332 
334  const ResultSetPtr& buffer,
335  const int table_id,
336  const int col_id,
337  const Data_Namespace::MemoryLevel memory_level,
338  const int device_id,
339  DeviceAllocator* device_allocator) const {
340  const ColumnarResults* result{nullptr};
341  {
342  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex_);
343  if (columnarized_table_cache_.empty() || !columnarized_table_cache_.count(table_id)) {
344  columnarized_table_cache_.insert(std::make_pair(
345  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
346  }
347  auto& frag_id_to_result = columnarized_table_cache_[table_id];
348  int frag_id = 0;
349  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
350  frag_id_to_result.insert(
351  std::make_pair(frag_id,
352  std::shared_ptr<const ColumnarResults>(columnarize_result(
353  executor_->row_set_mem_owner_, buffer, frag_id))));
354  }
355  CHECK_NE(size_t(0), columnarized_table_cache_.count(table_id));
356  result = columnarized_table_cache_[table_id][frag_id].get();
357  }
358  CHECK_GE(col_id, 0);
360  col_id,
361  &executor_->getCatalog()->getDataMgr(),
362  memory_level,
363  device_id,
364  device_allocator);
365 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
int get_column_id() const
Definition: Analyzer.h:196
const ChunkMetadataMap & getChunkMetadataMap() const
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
InputSourceType getSourceType() const
int getTableId() const
virtual void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const =0
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:165
#define CHECK_GE(x, y)
Definition: Logger.h:210
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
virtual int8_t * alloc(const size_t num_bytes)=0
ColumnCacheMap columnarized_table_cache_
std::shared_ptr< ResultSet > ResultSetPtr
Executor * executor_
const int8_t * col_buff
#define CHECK_GT(x, y)
Definition: Logger.h:209
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
const InputDescriptor & getScanDesc() const
const std::vector< int8_t * > & getColumnBuffers() const
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:157
std::mutex columnar_conversion_mutex_
std::string cat(Ts &&... args)
bool is_column() const
Definition: sqltypes.h:429
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:149
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
#define CHECK_NE(x, y)
Definition: Logger.h:206
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
const size_t size() const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
ColumnFetcher(Executor *executor, const ColumnCacheMap &column_cache)
#define CHECK_LT(x, y)
Definition: Logger.h:207
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const int frag_id)
Definition: Execute.h:201
int get_table_id() const
Definition: Analyzer.h:195
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:79
#define CHECK(condition)
Definition: Logger.h:197
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
std::vector< int > ChunkKey
Definition: types.h:35
size_t num_elems
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:120
const SQLTypeInfo & getColumnType(const int col_id) const