OmniSciDB  04ee39c94c
ColumnFetcher.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ColumnFetcher.h"
18 #include "Execute.h"
19 
20 ColumnFetcher::ColumnFetcher(Executor* executor, const ColumnCacheMap& column_cache)
21  : executor_(executor), columnarized_table_cache_(column_cache) {}
22 
23 std::pair<const int8_t*, size_t> ColumnFetcher::getOneColumnFragment(
24  Executor* executor,
25  const Analyzer::ColumnVar& hash_col,
26  const Fragmenter_Namespace::FragmentInfo& fragment,
27  const Data_Namespace::MemoryLevel effective_mem_lvl,
28  const int device_id,
29  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
30  ColumnCacheMap& column_cache) {
31  static std::mutex columnar_conversion_mutex;
32  if (fragment.isEmptyPhysicalFragment()) {
33  return {nullptr, 0};
34  }
35  auto chunk_meta_it = fragment.getChunkMetadataMap().find(hash_col.get_column_id());
36  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
37  const auto& catalog = *executor->getCatalog();
38  const auto cd = get_column_descriptor_maybe(
39  hash_col.get_column_id(), hash_col.get_table_id(), catalog);
40  CHECK(!cd || !(cd->isVirtualCol));
41  const int8_t* col_buff = nullptr;
42  if (cd) {
43  ChunkKey chunk_key{catalog.getCurrentDB().dbId,
44  fragment.physicalTableId,
45  hash_col.get_column_id(),
46  fragment.fragmentId};
47  const auto chunk = Chunk_NS::Chunk::getChunk(
48  cd,
49  &catalog.getDataMgr(),
50  chunk_key,
51  effective_mem_lvl,
52  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
53  chunk_meta_it->second.numBytes,
54  chunk_meta_it->second.numElements);
55  chunks_owner.push_back(chunk);
56  CHECK(chunk);
57  auto ab = chunk->get_buffer();
58  CHECK(ab->getMemoryPtr());
59  col_buff = reinterpret_cast<int8_t*>(ab->getMemoryPtr());
60  } else {
61  const ColumnarResults* col_frag{nullptr};
62  {
63  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
64  const auto table_id = hash_col.get_table_id();
65  const auto frag_id = fragment.fragmentId;
66  if (column_cache.empty() || !column_cache.count(table_id)) {
67  column_cache.insert(std::make_pair(
68  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
69  }
70  auto& frag_id_to_result = column_cache[table_id];
71  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
72  frag_id_to_result.insert(std::make_pair(
73  frag_id,
74  std::shared_ptr<const ColumnarResults>(columnarize_result(
75  executor->row_set_mem_owner_,
76  get_temporary_table(executor->temporary_tables_, hash_col.get_table_id()),
77  frag_id))));
78  }
79  col_frag = column_cache[table_id][frag_id].get();
80  }
81  col_buff = transferColumnIfNeeded(
82  col_frag,
83  hash_col.get_column_id(),
84  &catalog.getDataMgr(),
85  effective_mem_lvl,
86  effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id);
87  }
88  return {col_buff, fragment.getNumTuples()};
89 }
90 
91 std::pair<const int8_t*, size_t> ColumnFetcher::getAllColumnFragments(
92  Executor* executor,
93  const Analyzer::ColumnVar& hash_col,
94  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
95  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
96  ColumnCacheMap& column_cache) {
97  CHECK(!fragments.empty());
98  const size_t elem_width = hash_col.get_type_info().get_size();
99  std::vector<const int8_t*> col_frags;
100  std::vector<size_t> elem_counts;
101  for (auto& frag : fragments) {
102  const int8_t* col_frag = nullptr;
103  size_t elem_count = 0;
104  std::tie(col_frag, elem_count) = getOneColumnFragment(executor,
105  hash_col,
106  frag,
108  0,
109  chunks_owner,
110  column_cache);
111  if (col_frag == nullptr) {
112  continue;
113  }
114  CHECK_NE(elem_count, size_t(0));
115  col_frags.push_back(col_frag);
116  elem_counts.push_back(elem_count);
117  }
118  if (col_frags.empty()) {
119  return {nullptr, 0};
120  }
121  CHECK_EQ(col_frags.size(), elem_counts.size());
122  const auto total_elem_count =
123  std::accumulate(elem_counts.begin(), elem_counts.end(), size_t(0));
124  auto col_buff =
125  reinterpret_cast<int8_t*>(checked_malloc(total_elem_count * elem_width));
126  for (size_t i = 0, offset = 0; i < col_frags.size(); ++i) {
127  memcpy(col_buff + offset, col_frags[i], elem_counts[i] * elem_width);
128  offset += elem_counts[i] * elem_width;
129  }
130  return {col_buff, total_elem_count};
131 }
132 
134  const int table_id,
135  const int frag_id,
136  const int col_id,
137  const std::map<int, const TableFragments*>& all_tables_fragments,
138  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
139  std::list<ChunkIter>& chunk_iter_holder,
140  const Data_Namespace::MemoryLevel memory_level,
141  const int device_id) const {
142  static std::mutex varlen_chunk_mutex; // TODO(alex): remove
143  static std::mutex chunk_list_mutex;
144  const auto fragments_it = all_tables_fragments.find(table_id);
145  CHECK(fragments_it != all_tables_fragments.end());
146  const auto fragments = fragments_it->second;
147  const auto& fragment = (*fragments)[frag_id];
148  if (fragment.isEmptyPhysicalFragment()) {
149  return nullptr;
150  }
151  std::shared_ptr<Chunk_NS::Chunk> chunk;
152  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
153  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
154  CHECK(table_id > 0);
155  const auto& cat = *executor_->getCatalog();
156  auto cd = get_column_descriptor(col_id, table_id, cat);
157  CHECK(cd);
158  const auto col_type =
159  get_column_type(col_id, table_id, cd, executor_->temporary_tables_);
160  const bool is_real_string =
161  col_type.is_string() && col_type.get_compression() == kENCODING_NONE;
162  const bool is_varlen =
163  is_real_string ||
164  col_type.is_array(); // TODO: should it be col_type.is_varlen_array() ?
165  {
166  ChunkKey chunk_key{
167  cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
168  std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
169  if (is_varlen) {
170  varlen_chunk_lock.reset(new std::lock_guard<std::mutex>(varlen_chunk_mutex));
171  }
173  cd,
174  &cat.getDataMgr(),
175  chunk_key,
176  memory_level,
177  memory_level == Data_Namespace::CPU_LEVEL ? 0 : device_id,
178  chunk_meta_it->second.numBytes,
179  chunk_meta_it->second.numElements);
180  std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex);
181  chunk_holder.push_back(chunk);
182  }
183  if (is_varlen) {
184  CHECK_GT(table_id, 0);
185  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
186  chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
187  auto& chunk_iter = chunk_iter_holder.back();
188  if (memory_level == Data_Namespace::CPU_LEVEL) {
189  return reinterpret_cast<int8_t*>(&chunk_iter);
190  } else {
191  CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
192  auto& data_mgr = cat.getDataMgr();
193  auto chunk_iter_gpu = CudaAllocator::alloc(&data_mgr, sizeof(ChunkIter), device_id);
194  copy_to_gpu(&data_mgr,
195  reinterpret_cast<CUdeviceptr>(chunk_iter_gpu),
196  &chunk_iter,
197  sizeof(ChunkIter),
198  device_id);
199  return chunk_iter_gpu;
200  }
201  } else {
202  auto ab = chunk->get_buffer();
203  CHECK(ab->getMemoryPtr());
204  return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
205  }
206 }
207 
209  const int table_id,
210  const int col_id,
211  const std::map<int, const TableFragments*>& all_tables_fragments,
212  const Data_Namespace::MemoryLevel memory_level,
213  const int device_id) const {
214  const auto fragments_it = all_tables_fragments.find(table_id);
215  CHECK(fragments_it != all_tables_fragments.end());
216  const auto fragments = fragments_it->second;
217  const auto frag_count = fragments->size();
218  std::vector<std::unique_ptr<ColumnarResults>> column_frags;
219  const ColumnarResults* table_column = nullptr;
220  const InputColDescriptor col_desc(col_id, table_id, int(0));
222  {
223  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex_);
224  auto column_it = columnarized_scan_table_cache_.find(col_desc);
225  if (column_it == columnarized_scan_table_cache_.end()) {
226  for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
227  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
228  std::list<ChunkIter> chunk_iter_holder;
229  const auto& fragment = (*fragments)[frag_id];
230  if (fragment.isEmptyPhysicalFragment()) {
231  continue;
232  }
233  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
234  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
235  auto col_buffer = getOneTableColumnFragment(table_id,
236  static_cast<int>(frag_id),
237  col_id,
238  all_tables_fragments,
239  chunk_holder,
240  chunk_iter_holder,
242  int(0));
243  column_frags.push_back(
244  boost::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
245  col_buffer,
246  fragment.getNumTuples(),
247  chunk_meta_it->second.sqlType));
248  }
249  auto merged_results =
250  ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
251  table_column = merged_results.get();
252  columnarized_scan_table_cache_.emplace(col_desc, std::move(merged_results));
253  } else {
254  table_column = column_it->second.get();
255  }
256  }
258  table_column, 0, &executor_->getCatalog()->getDataMgr(), memory_level, device_id);
259 }
260 
262  const InputColDescriptor* col_desc,
263  const Data_Namespace::MemoryLevel memory_level,
264  const int device_id) const {
265  CHECK(col_desc);
266  const auto table_id = col_desc->getScanDesc().getTableId();
267  return getResultSetColumn(get_temporary_table(executor_->temporary_tables_, table_id),
268  table_id,
269  col_desc->getColId(),
270  memory_level,
271  device_id);
272 }
273 
275  const ColumnarResults* columnar_results,
276  const int col_id,
277  Data_Namespace::DataMgr* data_mgr,
278  const Data_Namespace::MemoryLevel memory_level,
279  const int device_id) {
280  if (!columnar_results) {
281  return nullptr;
282  }
283  const auto& col_buffers = columnar_results->getColumnBuffers();
284  CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
285  if (memory_level == Data_Namespace::GPU_LEVEL) {
286  const auto& col_ti = columnar_results->getColumnType(col_id);
287  const auto num_bytes = columnar_results->size() * col_ti.get_size();
288  auto gpu_col_buffer = CudaAllocator::alloc(data_mgr, num_bytes, device_id);
289  copy_to_gpu(data_mgr,
290  reinterpret_cast<CUdeviceptr>(gpu_col_buffer),
291  col_buffers[col_id],
292  num_bytes,
293  device_id);
294  return gpu_col_buffer;
295  }
296  return col_buffers[col_id];
297 }
298 
300  const ResultSetPtr& buffer,
301  const int table_id,
302  const int col_id,
303  const Data_Namespace::MemoryLevel memory_level,
304  const int device_id) const {
305  const ColumnarResults* result{nullptr};
306  {
307  std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex_);
308  if (columnarized_table_cache_.empty() || !columnarized_table_cache_.count(table_id)) {
309  columnarized_table_cache_.insert(std::make_pair(
310  table_id, std::unordered_map<int, std::shared_ptr<const ColumnarResults>>()));
311  }
312  auto& frag_id_to_result = columnarized_table_cache_[table_id];
313  int frag_id = 0;
314  if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
315  frag_id_to_result.insert(
316  std::make_pair(frag_id,
317  std::shared_ptr<const ColumnarResults>(columnarize_result(
318  executor_->row_set_mem_owner_, buffer, frag_id))));
319  }
320  CHECK_NE(size_t(0), columnarized_table_cache_.count(table_id));
321  result = columnarized_table_cache_[table_id][frag_id].get();
322  }
323  CHECK_GE(col_id, 0);
324  return transferColumnIfNeeded(
325  result, col_id, &executor_->getCatalog()->getDataMgr(), memory_level, device_id);
326 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
HOST DEVICE int get_size() const
Definition: sqltypes.h:333
int get_column_id() const
Definition: Analyzer.h:194
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
InputSourceType getSourceType() const
int getTableId() const
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:184
#define CHECK_GE(x, y)
Definition: Logger.h:200
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
ColumnCacheMap columnarized_table_cache_
Definition: ColumnFetcher.h:79
std::shared_ptr< ResultSet > ResultSetPtr
Executor * executor_
Definition: ColumnFetcher.h:76
#define CHECK_GT(x, y)
Definition: Logger.h:199
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
Definition: ColumnFetcher.h:85
const InputDescriptor & getScanDesc() const
const std::vector< int8_t * > & getColumnBuffers() const
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:176
static int8_t * alloc(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
std::mutex columnar_conversion_mutex_
Definition: ColumnFetcher.h:78
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:168
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
#define CHECK_NE(x, y)
Definition: Logger.h:196
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
const size_t size() const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
ColumnFetcher(Executor *executor, const ColumnCacheMap &column_cache)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
#define CHECK_LT(x, y)
Definition: Logger.h:197
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const int frag_id)
Definition: Execute.h:220
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
int get_table_id() const
Definition: Analyzer.h:193
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:77
#define CHECK(condition)
Definition: Logger.h:187
std::vector< int > ChunkKey
Definition: types.h:35
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments *> &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id) const
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
static std::pair< const int8_t *, size_t > getAllColumnFragments(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id)
const std::map< int, ChunkMetadata > & getChunkMetadataMap() const
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:139
const SQLTypeInfo & getColumnType(const int col_id) const