OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ColumnFetcher.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <memory>
20 
21 #include "QueryEngine/Execute.h"
22 
23 ColumnFetcher::ColumnFetcher(Executor* executor, const ColumnCacheMap& column_cache)
24  : executor_(executor), columnarized_table_cache_(column_cache) {}
25 
29 std::pair<const int8_t*, size_t> ColumnFetcher::getOneColumnFragment(
30  Executor* executor,
31  const Analyzer::ColumnVar& hash_col,
32  const Fragmenter_Namespace::FragmentInfo& fragment,
33  const Data_Namespace::MemoryLevel effective_mem_lvl,
34  const int device_id,
35  DeviceAllocator* device_allocator,
36  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
37  ColumnCacheMap& column_cache) {
38  static std::mutex columnar_conversion_mutex;
39  if (fragment.isEmptyPhysicalFragment()) {
40  return {nullptr, 0};
41  }
42  const auto table_id = hash_col.get_table_id();
43  auto chunk_meta_it = fragment.getChunkMetadataMap().find(hash_col.get_column_id());
44  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
45  const auto& catalog = *executor->getCatalog();
46  const auto cd =
47  get_column_descriptor_maybe(hash_col.get_column_id(), table_id, catalog);
48  CHECK(!cd || !(cd->isVirtualCol));
49  const int8_t* col_buff = nullptr;
50  if (cd) { // real table
51  ChunkKey chunk_key{catalog.getCurrentDB().dbId,
52  fragment.physicalTableId,
53  hash_col.get_column_id(),
54  fragment.fragmentId};
55  const auto chunk = Chunk_NS::Chunk::getChunk(
56  cd,
57  &catalog.getDataMgr(),
58  chunk_key,
59  effective_mem_lvl,
60  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
61  chunk_meta_it->second->numBytes,
62  chunk_meta_it->second->numElements);
63  chunks_owner.push_back(chunk);
64  CHECK(chunk);
65  auto ab = chunk->getBuffer();
66  CHECK(ab->getMemoryPtr());
67  col_buff = reinterpret_cast<int8_t*>(ab->getMemoryPtr());
68  } else { // temporary table
69  const ColumnarResults* col_frag{nullptr};
70  {
71  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
72  const auto frag_id = fragment.fragmentId;
73  if (column_cache.empty() || !column_cache.count(table_id)) {
74  column_cache.insert(std::make_pair(
75  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
76  }
77  auto& frag_id_to_result = column_cache[table_id];
78  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
79  frag_id_to_result.insert(
80  std::make_pair(frag_id,
81  std::shared_ptr<const ColumnarResults>(columnarize_result(
82  executor->row_set_mem_owner_,
83  get_temporary_table(executor->temporary_tables_, table_id),
84  frag_id))));
85  }
86  col_frag = column_cache[table_id][frag_id].get();
87  }
88  col_buff = transferColumnIfNeeded(
89  col_frag,
90  hash_col.get_column_id(),
91  &catalog.getDataMgr(),
92  effective_mem_lvl,
93  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
94  device_allocator);
95  }
96  return {col_buff, fragment.getNumTuples()};
97 }
98 
109  Executor* executor,
110  const Analyzer::ColumnVar& hash_col,
111  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
112  const Data_Namespace::MemoryLevel effective_mem_lvl,
113  const int device_id,
114  DeviceAllocator* device_allocator,
115  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
116  std::vector<std::shared_ptr<void>>& malloc_owner,
117  ColumnCacheMap& column_cache) {
118  CHECK(!fragments.empty());
119 
120  size_t col_chunks_buff_sz = sizeof(struct JoinChunk) * fragments.size();
121  // TODO: needs an allocator owner
122  auto col_chunks_buff = reinterpret_cast<int8_t*>(
123  malloc_owner.emplace_back(checked_malloc(col_chunks_buff_sz), free).get());
124  auto join_chunk_array = reinterpret_cast<struct JoinChunk*>(col_chunks_buff);
125 
126  size_t num_elems = 0;
127  size_t num_chunks = 0;
128  for (auto& frag : fragments) {
129  auto [col_buff, elem_count] = getOneColumnFragment(
130  executor,
131  hash_col,
132  frag,
133  effective_mem_lvl,
134  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
135  device_allocator,
136  chunks_owner,
137  column_cache);
138  if (col_buff != nullptr) {
139  num_elems += elem_count;
140  join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
141  } else {
142  continue;
143  }
144  ++num_chunks;
145  }
146 
147  int elem_sz = hash_col.get_type_info().get_size();
148  CHECK_GT(elem_sz, 0);
149 
150  return {col_chunks_buff,
151  col_chunks_buff_sz,
152  num_chunks,
153  num_elems,
154  static_cast<size_t>(elem_sz)};
155 }
156 
158  const int table_id,
159  const int frag_id,
160  const int col_id,
161  const std::map<int, const TableFragments*>& all_tables_fragments,
162  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
163  std::list<ChunkIter>& chunk_iter_holder,
164  const Data_Namespace::MemoryLevel memory_level,
165  const int device_id,
166  DeviceAllocator* allocator) const {
167  static std::mutex varlen_chunk_mutex; // TODO(alex): remove
168  static std::mutex chunk_list_mutex;
169  const auto fragments_it = all_tables_fragments.find(table_id);
170  CHECK(fragments_it != all_tables_fragments.end());
171  const auto fragments = fragments_it->second;
172  const auto& fragment = (*fragments)[frag_id];
173  if (fragment.isEmptyPhysicalFragment()) {
174  return nullptr;
175  }
176  std::shared_ptr<Chunk_NS::Chunk> chunk;
177  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
178  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
179  CHECK(table_id > 0);
180  const auto& cat = *executor_->getCatalog();
181  auto cd = get_column_descriptor(col_id, table_id, cat);
182  CHECK(cd);
183  const auto col_type =
184  get_column_type(col_id, table_id, cd, executor_->temporary_tables_);
185  const bool is_real_string =
186  col_type.is_string() && col_type.get_compression() == kENCODING_NONE;
187  const bool is_varlen =
188  is_real_string ||
189  col_type.is_array(); // TODO: should it be col_type.is_varlen_array() ?
190  {
191  ChunkKey chunk_key{
192  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
193  std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
194  if (is_varlen) {
195  varlen_chunk_lock.reset(new std::lock_guard<std::mutex>(varlen_chunk_mutex));
196  }
198  cd,
199  &cat.getDataMgr(),
200  chunk_key,
201  memory_level,
202  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id,
203  chunk_meta_it->second->numBytes,
204  chunk_meta_it->second->numElements);
205  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex);
206  chunk_holder.push_back(chunk);
207  }
208  if (is_varlen) {
209  CHECK_GT(table_id, 0);
210  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
211  chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
212  auto& chunk_iter = chunk_iter_holder.back();
213  if (memory_level == Data_Namespace::CPU_LEVEL) {
214  return reinterpret_cast<int8_t*>(&chunk_iter);
215  } else {
216  auto ab = chunk->getBuffer();
217  ab->pin();
218  auto& row_set_mem_owner = executor_->getRowSetMemoryOwner();
219  row_set_mem_owner->addVarlenInputBuffer(ab);
220  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
221  CHECK(allocator);
222  auto chunk_iter_gpu = allocator->alloc(sizeof(ChunkIter));
223  allocator->copyToDevice(
224  chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter), sizeof(ChunkIter));
225  return chunk_iter_gpu;
226  }
227  } else {
228  auto ab = chunk->getBuffer();
229  CHECK(ab->getMemoryPtr());
230  return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
231  }
232 }
233 
235  const int table_id,
236  const int col_id,
237  const std::map<int, const TableFragments*>& all_tables_fragments,
238  const Data_Namespace::MemoryLevel memory_level,
239  const int device_id,
240  DeviceAllocator* device_allocator) const {
241  const auto fragments_it = all_tables_fragments.find(table_id);
242  CHECK(fragments_it != all_tables_fragments.end());
243  const auto fragments = fragments_it->second;
244  const auto frag_count = fragments->size();
245  std::vector<std::unique_ptr<ColumnarResults>> column_frags;
246  const ColumnarResults* table_column = nullptr;
247  const InputColDescriptor col_desc(col_id, table_id, int(0));
249  {
250  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex_);
251  auto column_it = columnarized_scan_table_cache_.find(col_desc);
252  if (column_it == columnarized_scan_table_cache_.end()) {
253  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
254  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
255  std::list<ChunkIter> chunk_iter_holder;
256  const auto& fragment = (*fragments)[frag_id];
257  if (fragment.isEmptyPhysicalFragment()) {
258  continue;
259  }
260  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
261  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
262  auto col_buffer = getOneTableColumnFragment(table_id,
263  static_cast<int>(frag_id),
264  col_id,
265  all_tables_fragments,
266  chunk_holder,
267  chunk_iter_holder,
269  int(0),
270  device_allocator);
271  column_frags.push_back(
272  std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
273  col_buffer,
274  fragment.getNumTuples(),
275  chunk_meta_it->second->sqlType));
276  }
277  auto merged_results =
278  ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
279  table_column = merged_results.get();
280  columnarized_scan_table_cache_.emplace(col_desc, std::move(merged_results));
281  } else {
282  table_column = column_it->second.get();
283  }
284  }
285  return ColumnFetcher::transferColumnIfNeeded(table_column,
286  0,
287  &executor_->getCatalog()->getDataMgr(),
288  memory_level,
289  device_id,
290  device_allocator);
291 }
292 
294  const InputColDescriptor* col_desc,
295  const Data_Namespace::MemoryLevel memory_level,
296  const int device_id,
297  DeviceAllocator* device_allocator) const {
298  CHECK(col_desc);
299  const auto table_id = col_desc->getScanDesc().getTableId();
300  return getResultSetColumn(get_temporary_table(executor_->temporary_tables_, table_id),
301  table_id,
302  col_desc->getColId(),
303  memory_level,
304  device_id,
305  device_allocator);
306 }
307 
309  const ColumnarResults* columnar_results,
310  const int col_id,
311  Data_Namespace::DataMgr* data_mgr,
312  const Data_Namespace::MemoryLevel memory_level,
313  const int device_id,
314  DeviceAllocator* device_allocator) {
315  if (!columnar_results) {
316  return nullptr;
317  }
318  const auto& col_buffers = columnar_results->getColumnBuffers();
319  CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
320  if (memory_level == Data_Namespace::GPU_LEVEL) {
321  const auto& col_ti = columnar_results->getColumnType(col_id);
322  const auto num_bytes = columnar_results->size() * col_ti.get_size();
323  CHECK(device_allocator);
324  auto gpu_col_buffer = device_allocator->alloc(num_bytes);
325  device_allocator->copyToDevice(gpu_col_buffer, col_buffers[col_id], num_bytes);
326  return gpu_col_buffer;
327  }
328  return col_buffers[col_id];
329 }
330 
332  const ResultSetPtr& buffer,
333  const int table_id,
334  const int col_id,
335  const Data_Namespace::MemoryLevel memory_level,
336  const int device_id,
337  DeviceAllocator* device_allocator) const {
338  const ColumnarResults* result{nullptr};
339  {
340  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex_);
341  if (columnarized_table_cache_.empty() || !columnarized_table_cache_.count(table_id)) {
342  columnarized_table_cache_.insert(std::make_pair(
343  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
344  }
345  auto& frag_id_to_result = columnarized_table_cache_[table_id];
346  int frag_id = 0;
347  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
348  frag_id_to_result.insert(
349  std::make_pair(frag_id,
350  std::shared_ptr<const ColumnarResults>(columnarize_result(
351  executor_->row_set_mem_owner_, buffer, frag_id))));
352  }
353  CHECK_NE(size_t(0), columnarized_table_cache_.count(table_id));
354  result = columnarized_table_cache_[table_id][frag_id].get();
355  }
356  CHECK_GE(col_id, 0);
358  col_id,
359  &executor_->getCatalog()->getDataMgr(),
360  memory_level,
361  device_id,
362  device_allocator);
363 }
int get_table_id() const
Definition: Analyzer.h:194
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:37
HOST DEVICE int get_size() const
Definition: sqltypes.h:340
std::string cat(Ts &&...args)
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
virtual void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const =0
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:199
#define CHECK_GE(x, y)
Definition: Logger.h:210
virtual int8_t * alloc(const size_t num_bytes)=0
ColumnCacheMap columnarized_table_cache_
std::shared_ptr< ResultSet > ResultSetPtr
Executor * executor_
const int8_t * col_buff
#define CHECK_GT(x, y)
Definition: Logger.h:209
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:191
std::mutex columnar_conversion_mutex_
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:183
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
#define CHECK_NE(x, y)
Definition: Logger.h:206
const size_t size() const
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
int getColId() const
int getTableId() const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
ColumnFetcher(Executor *executor, const ColumnCacheMap &column_cache)
const ChunkMetadataMap & getChunkMetadataMap() const
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
InputSourceType getSourceType() const
#define CHECK_LT(x, y)
Definition: Logger.h:207
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const int frag_id)
Definition: Execute.h:235
#define CHECK(condition)
Definition: Logger.h:197
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
int get_column_id() const
Definition: Analyzer.h:195
size_t num_elems
const std::vector< int8_t * > & getColumnBuffers() const
const InputDescriptor & getScanDesc() const
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:154
const SQLTypeInfo & getColumnType(const int col_id) const